How to make command to wait until all events triggered against it are completed successfully

1.8k views Asked by At

I have came across a requirement where i want axon to wait untill all events in the eventbus fired against a particular Command finishes their execution. I will the brief the scenario:

I have a RestController which fires below command to create an application entity:

@RestController
class myController{
  @PostMapping("/create")
  @ResponseBody
  public String create(
    org.axonframework.commandhandling.gateway.CommandGateway.sendAndWait(new CreateApplicationCommand());
    System.out.println(“in myController:: after sending CreateApplicationCommand”);
  }
}

This command is being handled in the Aggregate, The Aggregate class is annotated with org.axonframework.spring.stereotype.Aggregate:

@Aggregate
class MyAggregate{
   @CommandHandler //org.axonframework.commandhandling.CommandHandler
   private MyAggregate(CreateApplicationCommand command) {
      org.axonframework.modelling.command.AggregateLifecycle.apply(new AppCreatedEvent());
      System.out.println(“in MyAggregate:: after firing AppCreatedEvent”);
   }

   @EventSourcingHandler //org.axonframework.eventsourcing.EventSourcingHandler
   private void on(AppCreatedEvent appCreatedEvent) {
      // Updates the state of the aggregate
      this.id = appCreatedEvent.getId();
      this.name = appCreatedEvent.getName();
      System.out.println(“in MyAggregate:: after updating state”);
   }
}

The AppCreatedEvent is handled at 2 places:

  1. In the Aggregate itself, as we can see above.
  2. In the projection class as below:
 @EventHandler //org.axonframework.eventhandling.EventHandler
 void on(AppCreatedEvent appCreatedEvent){
    // persists into database
    System.out.println(“in Projection:: after saving into database”);
 }

The problem here is after catching the event at first place(i.e., inside aggregate) the call gets returned to myController. i.e. The output here is:

in MyAggregate:: after firing AppCreatedEvent
in MyAggregate:: after updating state
in myController:: after sending CreateApplicationCommand
in Projection:: after saving into database

The output which i want is:

in MyAggregate:: after firing AppCreatedEvent
in MyAggregate:: after updating state
in Projection:: after saving into database
in myController:: after sending CreateApplicationCommand

In simple words, i want axon to wait untill all events triggered against a particular command are executed completely and then return to the class which triggered the command.

After searching on the forum i got to know that all sendAndWait does is wait until the handling of the command and publication of the events is finalized, and then i tired with Reactor Extension as well using below but got same results: org.axonframework.extensions.reactor.commandhandling.gateway.ReactorCommandGateway.send(new CreateApplicationCommand()).block();

Can someone please help me out. Thanks in advance.

1

There are 1 answers

0
Steven On

What would be best in your situation, @rohit, is to embrace the fact you are using an eventually consistent solution here. Thus, Command Handling is entirely separate from Event Handling, making the Query Models you create eventually consistent with the Command Model (your aggregates). Therefore, you wouldn't necessarily wait for the events exactly but react when the Query Model is present.

Embracing this comes down to building your application such that "yeah, I know my response might not be up to date now, but it might be somewhere in the near future." It is thus recommended to subscribe to the result you are interested in after or before the fact you have dispatched a command. For example, you could see this as using WebSockets with the STOMP protocol, or you could tap into Project Reactor and use the Flux result type to receive the results as they go.

From your description, I assume you or your business have decided that the UI component should react in the (old-fashioned) synchronous way. There's nothing wrong with that, but it will bite your *ss when it comes to using something inherently eventually consistent like CQRS. You can, however, spoof the fact you are synchronous in your front-end, if you will.

To achieve this, I would recommend using Axon's Subscription Query to subscribe to the query model you know will be updated by the command you will send. In pseudo-code, that would look a little bit like this:

public Result mySynchronousCall(String identifier) {
    // Subscribe to the updates to come
    SubscriptionQueryResult<Result> result = QueryGateway.subscriptionQuery(...);
    // Issue command to update
    CommandGateway.send(...);
    // Wait on the Flux for the first result, and then close it
    return result.updates()
                 .next()
                 .map(...)
                 .timeout(...)
                 .doFinally(it -> result.close());

}

You could see this being done in this sample WebFluxRest class, by the way.

Note that you are essentially closing the door to the front-end to tap into the asynchronous goodness by doing this. It'll work and allow you to wait for the result to be there as soon as it is there, but you'll lose some flexibility.