I am using a Gateway
and inside the gateway I am looping over to send the message to 2 queues via sqs-outbound adapter
.
I want to achieve something like this:
jsonArray.forEach(data -> {
Future<AggregatedAckorErrorFrom2Queues> result = dataPublishGateway.publishToService(data);
futures.add(result);
});
futures.forEach(f -> {
System.out.println("Future Result -> "+ f.get())
});
The Gateway and SQS Adapter Config
<!-- Gateway to Publish to SQS -->
<task:executor id="dataExecutor" pool-size="5"/>
<int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" />
<int:gateway service-interface="com.integration.gateway.PublishGateway" id="dataPublishGateway">
<int:method name="publishToDataService" payload-expression="#args[0]" request-channel="dataChannel" />
</int:gateway>
<!-- Send to 2 Queues -->
<int:channel id="successChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
queue="queue-1"
channel="dataChannel"
success-channel="successChannel"
failure-channel="errorChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
queue="queue-2"
channel="dataChannel"
success-channel="successChannel"
failure-channel="errorChannel"/>
I am looking at apply-sequence
on the dataChannel
and an aggregator
, but the aggregator
must be able to process both ack
and error
.
Question : How can I return an aggregated response(ack + error) from 2 queues back to the gateway?
You are on the right direction with those
success-channel
andfailure-channel
. The aggregator can subscribe to thatsuccessChannel
and do correlation and release using default strategies. With anfailure-channel
you probably need a custom error channel, not that one globalerrorChannel
. TheErrorMessage
sent to that channel from theSqsMessageHandler
has a payload like thisAwsRequestFailureException
, where itsfailedMessage
really contains a request message with the mentioned correlation details. So, you need to add some transformation step to extract that info from the exception before proceeding to the aggregator.