How I can wait until all item were consumed

41 views Asked by At

My task is simple, I need to query elasticsearch rolling index with reactive way.

As the @Document didn't support index name with Spring EL, like @Document(index = "indexName-#(new Date().format(yyyy-MM-dd))")

So I am trying to call elasticsearch with ReactiveElasticsearchTemplate which support me changing index name in runtime.

But as the data volume is larger than 10000, so I need to use scroll to repeat query until we get all data.

I have finished the first query and scroll query and it could return value.

But I need to combine all result and then return.

How can I do that? For now when the consumer still working, the empty result have been return to frontend. How can I ask the thread wait until the consumer finish the elasticsearch return all data? Thanks.

public Flux<ELKModel> getByTradeDateBetween(LocalDateTime from, LocalDateTime to)
  throws Exception {
List<ELKModel> result = new ArrayList<ELKModel>();
List<Long> total = new ArrayList<>();
List<Long> currentSize = new ArrayList<>();
List<String> scrollId = new ArrayList<>();

NativeSearchQueryBuilder sourceBuilder = new NativeSearchQueryBuilder();
sourceBuilder.withQuery(
    QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery(TRADE_DATE).gte(from).lte(to)));
sourceBuilder.withPageable(PageRequest.of(0, SINGLE_QUERY_SIZE));
NativeSearchQuery query = sourceBuilder.build();
 elasticsearchSupport
    .scrollStart(query, ELKModel.class)
    .map(ELKModelWrapper::valueFrom).
        subscribe(
        wrapper -> {
          total.add(wrapper.getTotal());l
          currentSize.add(wrapper.getCurrentSize());
          result.addAll(wrapper.getResults());
          scrollId.add(wrapper.getScrollId());
        }).dispose();

while (currentSize.size() == 1 && total.size() == 1 && currentSize.get(0) < total.get(0)) {
    elasticsearchSupport
      .scrollContinue(scrollId.get(0), ELKModel.class)
      .map(ELKModelWrapper::valueFrom)
      .subscribe(
          wrapper -> {
            currentSize.add(0, currentSize.get(0) + wrapper.getCurrentSize());
            result.addAll(wrapper.getResults());
            scrollId.add(0, wrapper.getScrollId());
          }).dispose();
          
}

return Flux.fromIterable(result);

}

2

There are 2 answers

1
P.J.Meisch On BEST ANSWER

You must use a pretty outdated version of Spring Data Elasticsearch. SpEL support for index names in the @Document annotation was introduced 4 years ago, you can see how this can be used in my post at https://www.sothawo.com/2020/07/how-to-provide-a-dynamic-index-name-in-spring-data-elasticsearch-using-spel/.

As for the reactive thing: you must never block a thread in reactive code. And the reactive code does this scrolling under the hood, I don't see why you want to do this by yourself.

1
Malvin Lok On

Since you're using a reactive approach with Flux, you should not be blocking with the while statement. Instead, you should chain your reactive operations.

expand is used to perform repeat queries until we have collected all the required data, applying the scrollContinue operation only if there's still data left according to the comparison of total and current size.

Some modification of your code, not surely runnable though.

public Flux<ELKModel> getByTradeDateBetween(LocalDateTime from, LocalDateTime to) {

    NativeSearchQueryBuilder sourceBuilder = new NativeSearchQueryBuilder();
    sourceBuilder.withQuery(
            QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery(TRADE_DATE).gte(from).lte(to)));
    sourceBuilder.withPageable(PageRequest.of(0, SINGLE_QUERY_SIZE));
    NativeSearchQuery query = sourceBuilder.build();

    return elasticsearchSupport.scrollStart(query, ELKModel.class)
            .expand(wrapper -> {
                if (wrapper.getTotal() > wrapper.getCurrentSize()) {
                    return elasticsearchSupport.scrollContinue(wrapper.getScrollId(), ELKModel.class)
                            .map(ELKModelWrapper::valueFrom);
                } else {
                    return Flux.empty();
                }
            })
            .flatMap(wrapper -> Flux.fromIterable(wrapper.getResults()));
}