I have a data service and I am seriously considering switching to a reactive model. This is a federated query engine that can resolve data for queries by invoking one or more "resolver" implementations, depending on the query type.
If I switch to spring-data-mongodb-reactive
, then each of these implementations would have to create a number of Flux
instances for:
- the queries for different parts of the information
- querying all databases for each query from #1
Note: I don't want to combine every Flux
because being able to keep the queries for #1 above separate make the final processing much easier. Combining each "part" query for all federated databases would be fine, but I have to keep the data for each "part" separate. I hope that makes sense.
Explaining the full workflow is out of the scope of this post, but I am wondering how I can create any number of Flux
instances, and subscribe to them to get them started, but then wait until they all complete before proceeding with the processing of the fully-retrieved data across all federated sources. In Java, I am looking for something that is analogous to a CompletableFuture.allOf()
.
Am I even close to being on the right track if I do something like this:
public class ReactiveDataService {
private static final Supplier<Example<String>> example1 = () -> Example.of("Example 1");
private static final Supplier<Example<String>> example2 = () -> Example.of("Example 2");
private static final Supplier<Example<String>> example3 = () -> Example.of("Example 3");
private static final Supplier<Example<String>> example4 = () -> Example.of("Example 4");
private static final Supplier<Example<String>> example5 = () -> Example.of("Example 5");
private final Collection<ReactiveMongoRepository<String, String>> repositories;
public ReactiveDataService(Collection<ReactiveMongoRepository<String, String>> repositories) {
this.repositories = repositories;
}
private void processFluxes(final Flux<String> flux1, final Flux<String> flux2, final Flux<String> flux3,
final Flux<String> flux4, final Flux<String> flux5) {
// Call service to process flux stuff
}
/**
* For all repositories, combine fluxes that run the same query.
* Subscribe to each flux immediately to get the query started.
* Add all fluxes to a container flux that processes the results
* upon completion.
* After everything is set up, block until completion.
*/
public void doQuery() {
final Flux<String> flux1 = Flux.fromIterable(repositories)
.flatMap(repo -> repo.findAll(example1.get()));
flux1.subscribe();
final Flux<String> flux2 = Flux.fromIterable(repositories)
.flatMap(repo -> repo.findAll(example2.get()));
flux2.subscribe();
final Flux<String> flux3 = Flux.fromIterable(repositories)
.flatMap(repo -> repo.findAll(example3.get()));
flux3.subscribe();
final Flux<String> flux4 = Flux.fromIterable(repositories)
.flatMap(repo -> repo.findAll(example4.get()));
flux4.subscribe();
final Flux<String> flux5 = Flux.fromIterable(repositories)
.flatMap(repo -> repo.findAll(example5.get()));
flux5.subscribe();
final Flux<Flux<String>> fluxes = Flux.just(flux1, flux2, flux3, flux4, flux5)
.doOnComplete(() -> processFluxes(flux1, flux2, flux3, flux4, flux5));
fluxes.blockLast();
}
}
Here is an example of how you can do using Mono.zip:
which outputs:
So you can adapt it to your situation.
Note that Mono.zip cannot return null, that's why I put "output" as a result, but if you don't need any output you can put whatever you want which is not null.
The idea is first to convert every
Flux<String>
intoMono<List<String>>
by usingcollectList
, it will be simpler then to process.Mono.zip
allows you to wait for all to be done, and process the output asObject[]
. You can convert each object into aList<String>
for processing.