How to repeat or retry conditionally with reactor

3.4k views Asked by At

I use SpringBoot and reactive programming with Webflux. I want to repeat the service until data is available(something is returned apart from null)

I have a service which insert some data into databse and there is 2nd service which consumes the data. I want to keep on querying the database from 2nd service untill the data is avaiable into it. Below code I am trying to achieve this using Project Reactor:

Mono<SubscriptionQueryResult<App, App>> subscriptionQuery = reactiveQueryGateway
.subscriptionQuery(new FindAppByIdQuery(appId), ResponseTypes.instanceOf(App.class), ResponseTypes.instanceOf(App.class));

subscriptionQuery
  .filter(a -> Objects.nonNull(a.initialResult().block()))
  .repeatWhen(Repeat.onlyIf(repeatContext -> true)
  .exponentialBackoff(Duration.ofMillis(100), Duration.ofSeconds(100))
  .timeout(Duration.ofSeconds(30))).subscribe();

while executing this i am getting below exception:

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1

while going through webflux documentation i found that Calling block() function isn’t possible within Reactor thread. Such an attempt causes the above error:

To overcome that i tried below:

subscriptionQuery
 .flatMap(a -> a.initialResult())
 .filter(a -> Objects.nonNull(a))
 .repeatWhen(Repeat.onlyIf(repeatContext -> true)
 .exponentialBackoff(Duration.ofMillis(100), Duration.ofSeconds(100))
 .timeout(Duration.ofSeconds(30)))
 .subscribe();

but it is not giving me the desired result, i think i am missing something. can anyone please suggest the correct way to achieve this.

Thanks.

2

There are 2 answers

0
Lucas Campos On

let me try to help you on this issue.

In fact, the best way to tackle is to subscribe to it before sending the command. In this way, you know when the update is emitted by the subscription query.

We have a code-samples that can help you on this.

To expand on that, the part that you are most interested should be this one on CommandController:

public <U> Mono<U> sendAndReturnUpdate(Object command, SubscriptionQueryResult<?, U> result) {
    /* The trick here is to subscribe to initial results first, even it does not return any result
     Subscribing to initialResult creates a buffer for updates, even that we didn't subscribe for updates yet
     they will wait for us in buffer, after this we can safely send command, and then subscribe to updates */
    return Mono.when(result.initialResult())
               .then(Mono.fromCompletionStage(() -> commandGateway.send(command)))
               .thenMany(result.updates())
               .timeout(Duration.ofSeconds(5))
               .next()
               .doFinally(unused -> result.cancel());
    /* dont forget to close subscription query on the end and add a timeout */
}
0
emvidi On

I don't know if I am understanding correctly your issue but this is what I use when sending a command and querying the result (qryGateway is ReactorQueryGateway):

qryGateway.queryUpdates(query, AuthenticationResult.class)
          .mergeWith(cmdGateway.send(command))
          .filter(s -> s instanceof AuthenticationResult)
          .next()
          .timeout(Duration.ofSeconds(2))
          .subscribeOn(Schedulers.elastic());

I am learning axon and reactor at the moment so I am no expert.