Aggregated Future/Response for SQS Acks/Error on multiple queues with Spring Integration SQS

114 views Asked by At

I am using a Gateway and inside the gateway I am looping over to send the message to 2 queues via sqs-outbound adapter.

I want to achieve something like this:

jsonArray.forEach(data -> {
    Future<AggregatedAckorErrorFrom2Queues> result = dataPublishGateway.publishToService(data);
    futures.add(result);
});
futures.forEach(f -> {                      
    System.out.println("Future Result -> "+ f.get())
});

The Gateway and SQS Adapter Config

<!-- Gateway to Publish to SQS -->
<task:executor id="dataExecutor" pool-size="5"/>
<int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" />
<int:gateway service-interface="com.integration.gateway.PublishGateway" id="dataPublishGateway">
    <int:method name="publishToDataService" payload-expression="#args[0]" request-channel="dataChannel" />
</int:gateway>

<!-- Send to 2 Queues -->

    <int:channel id="successChannel"/>
    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="queue-1"
                                          channel="dataChannel"
                                          success-channel="successChannel"
                                          failure-channel="errorChannel"/>
    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="queue-2"
                                          channel="dataChannel"
                                          success-channel="successChannel"
                                          failure-channel="errorChannel"/>

I am looking at apply-sequence on the dataChannel and an aggregator, but the aggregator must be able to process both ack and error.

Question : How can I return an aggregated response(ack + error) from 2 queues back to the gateway?

1

There are 1 answers

8
Artem Bilan On BEST ANSWER

You are on the right direction with those success-channel and failure-channel. The aggregator can subscribe to that successChannel and do correlation and release using default strategies. With an failure-channel you probably need a custom error channel, not that one global errorChannel. The ErrorMessage sent to that channel from the SqsMessageHandler has a payload like this AwsRequestFailureException, where its failedMessage really contains a request message with the mentioned correlation details. So, you need to add some transformation step to extract that info from the exception before proceeding to the aggregator.