version_conflict_engine_exception with _update_by_query

2.2k views Asked by At

I use ElasticSearch update by query API in flink, flink parallelism is 1. But I got version_conflict_engine_exception, This is my code in flink RichSinkFunction like this:

        UpdateByQueryRequestBuilder builder = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
        builder.abortOnVersionConflict(true);
        builder.source(indexName);
        builder.filter(filter);
        builder.setMaxRetries(MAX_RETRIES);
        builder.refresh(true);

        String updateTime = Instant.ofEpochMilli(ts).atZone(ZoneId.systemDefault())
                .format(ELASTIC_SEARCH_DATE_TIME_FORMATTER);

        Map<String, Object> params = Maps.newHashMap();
        params.put("fieldName", fieldName);
        params.put("updateTime", updateTime);
        params.put("model", this.transformMap(JacksonUtils.convertValue(model, new TypeReference<Map<String, Object>>() {
        })));

        builder.script(new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, UPDATE_BY_MODEL_PAINLESS_CODE, params));
        BulkByScrollResponse response = builder.get();

I can sure that, only this application access Elasticsearch, flink parallelism is 1 just like in single thread invoke update by query API? Why I got a version_conflict_engine_exception? and How to do exactly once?

1

There are 1 answers

0
David Anderson On

I see two possibilities:

  1. Something else is running that can update the document.
  2. Flink's elasticsearch sink provides at-least-once guarantees, meaning that in the event of failure, the sink will sometimes perform duplicate writes during recovery. Perhaps this can result in attempts to update a document using an out-of-date version number.