Axon QueryHandler timing problem, how to use initialResult() and updates() so i will only invoke one response

45 views Asked by At

I have a question about the QueryGateway.subscribtionQuery() We have a Query for an aanvraagId, which will respond with the corresponding referentschapId.

    public void verwerkBeperkingErkenningsdoelGematcht(UUID aanvraagId,
                                                       UUID organisatieId,
                                                       UUID persoonId,
                                                       UUID erkenningId,

                                                       String grondslag,
                                                       Beperking beperking) {

        queryGateway.subscriptionQuery(FetchReferentschapAanvraagView.builder().aanvraagId(aanvraagId).build(),
                        ResponseTypes.instanceOf(UUID.class),
                        ResponseTypes.instanceOf(UUID.class)
        ).handle(referentschapId -> {
commandGateway.sendAndWait(RegistreerErkenningGrondslagEnBeperkingBijReferentschap.builder()
                        .referentschapId(referentschapId)
                        .organisatieId(organisatieId)
                        .persoonId(persoonId)
                        .erkenningId(erkenningId)
                        .grondslag(grondslag)
                        .beperking(beperking)
                        .build()
        );
                    },
                referentschapId -> {
commandGateway.sendAndWait(RegistreerErkenningGrondslagEnBeperkingBijReferentschap.builder()
                        .referentschapId(referentschapId)
                        .organisatieId(organisatieId)
                        .persoonId(persoonId)
                        .erkenningId(erkenningId)
                        .grondslag(grondslag)
                        .beperking(beperking)
                        .build()
        );
                }
        );
    }

The QueryHandler looks like :

    @QueryHandler
    public Optional<UUID> getReferentschapIdVoorAanvraagId(FetchReferentschapAanvraagView fetchQuery) {
        log.info("getReferentschapIdVoorAanvraagId: %s".formatted(fetchQuery));
        return referentschapAanvraagViewRepository.findByAanvraagId(fetchQuery.getAanvraagId())
                                                  .map(ReferentschapAanvraagView::getReferentschapId);
    }

public class FetchReferentschapAanvraagView {

    private UUID aanvraagId;
}

and the Axon EventHandler:

    @EventHandler
    public void referentschapGemaakt(ReferentschapGemaakt event) {
        referentschapAanvraagViewRepository.findById(event.getReferentschapId())
                                           .ifPresentOrElse(referentschapAanvraagView ->
                                                           log.warn(ER_BESTAAT_AL_EEN_REFERENTSCHAP_MET_ID_EN_AANVRAAG_ID
                                                                   .formatted(referentschapAanvraagView.getReferentschapId(),
                                                                           referentschapAanvraagView.getAanvraagId())
                                                           ),
                                                   () -> referentschapAanvraagViewRepository.save(ReferentschapAanvraagView.builder()
                                                           .referentschapId(event.getReferentschapId())
                                                           .aanvraagId(event.getAanvraagId())
                                                           .build()
                                                   )
                                           );

        queryUpdateEmitter.emit(FetchReferentschapAanvraagView.class, query -> event.getAanvraagId().equals(query.getAanvraagId()), event.getReferentschapId());
    }

The query is for a repository with the table:

@Entity
public class ReferentschapAanvraagView {

    @Id
    @Column(columnDefinition = "uuid")
    private UUID referentschapId;
    @Column(columnDefinition = "uuid")
    private UUID aanvraagId;

}

Now is our situation as follows:

  • 1st axon will get a command A to create an entry with aanvraagId=1 & referentschapId=2 and is stored in table ReferentschapAanvraagView with an AxonEvent and a queryUpdateEmitter.emit is performed.

  • 2nd axon will do a Query QA with FetchReferentschapAanvraagView.class to retrieve the referentschapId for a given aanvraagId. with the retrieved referentschapId a command B is given.

  • 3th axon will do for 2nd time the Query QA to retrieve the referentschapId for a given aanvraagId. with the retrieved referentschapId a command C is given.

  • 4th axon will do for 3th time the Query QA to retrieve the referentschapId for a given aanvraagId. with the retrieved referentschapId a command D is given.

Step 1 to 4 use al the same aanvraagId, and has one corresponding random generated referentschapId thus commands B,C and D use same referentschapId.
The commands B, C and D must only be executed once with the retrieved referentschapId.

(?) How can we quaranty that the commands B,C and D are only executed once?

The problem is that when we do the Query's QA whe don't know if the new entry in ReferentschapAanvraagView is saved in 1st step before the Queries FetchReferentschapAanvraagView for commands B,C and are performed. It could that the entry is saved before, during and after the queries. How can we achieve a query to handle this? How we interpret the queryGateway.subscriptionQuery().initialResult() and the queryGateway.subscriptionQuery().updates()

  • initialResult(): goes to the QueryHandler, reads table for aanvraagId, if data is saved for aanvraagId then return this data, return null otherwise.
  • updates().subscruption(): responds every time an emit to the Query is given, in our case only once. So we where thinking to do an initialResult() if data present send command, if not do .updates().subscribe() Is this gonna work and how do we fix the subscription to execute only once?
0

There are 0 answers