I have two threads and each of the method in the thread throws an exception. How do I get all the errors that's thrown in each of the thread? In this code, the error channel just catch one of the errors. Basically my goal is to catch all the errors and send them to the caller (a rest controller). Any help will be appreciated. Thanks.
Integration.java
public IntegrationFlow provisionUserFlow() {
return IntegrationFlows.from("input.channel")
.publishSubscribeChannel(Executors.newCachedThreadPool(),
s -> s
.subscribe(f -> f.handle(provisionerA, "provision"))
.subscribe(f -> f.handle(provisionerB, "provision"))
.get();
}
@ServiceActivator( inputChannel = "errorChannel", outputChannel = "replyChannel")
public boolean processErrors(Exception message) throws RuntimeException{
System.out.println("Message" + message.getMessage());
System.out.println ("******************************");
throw new RuntimeException(message.getMessage());
}
MGateway.java
@MessagingGateway(errorChannel = "errorChannel")
public interface MGateway {
@Gateway(requestChannel = "input.channel", replyChannel = "replyChannel")
boolean invokeProvisioner(User user);
}
SOLUTION
@Bean
public IntegrationFlow provisionUserFlow() {
return
IntegrationFlows.from("input.channel")
.publishSubscribeChannel(Executors.newCachedThreadPool(),
s -> s.applySequence(true)
.subscribe(f -> f.enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
.handle(provisionerA, "provision")
.channel("aggregatorChannel")
)
.subscribe(f -> f.enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
.handle(provisionerB, "provision")
.channel("aggregatorChannel"))
)
.get();
}
@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlows.from("aggregatorChannel")
.channel( aggregatorChannel)
.aggregate( a -> a.processor( collect, "aggregatingMethod"))
.get();
}
@Transformer( inputChannel = "errorChannel", outputChannel = "aggregatorChannel")
public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {
Message<?> failedMessage = ((MessagingException) errorMessage.getPayload())
.getFailedMessage();
Exception exception = (Exception) errorMessage.getPayload();
return MessageBuilder.withPayload( exception.getMessage())
.copyHeadersIfAbsent( failedMessage.getHeaders() )
.build();
}
You see
@Gateway
is just Java method. It has one return and may throw one exception. I'm still confused why people think that Spring Integration works somehow different. It is fully based on Java and does nothing magic - only calls java methods.Now let's imaging what you'd do if you develop just with raw Java. Right, you would wait for relies from both threads and build a single return to the caller.
The same we can do with Spring Integration. Just need to use
Aggregator
EIP. You can catch error messages in that error channel and correlate them via theirfailedMessage
s. The.publishSubscribeChannel()
can be supplied with the option:which is
false
by default. The aggregator then can just rely on the defaultcorrelationStrategy
and gather the group of errors for you to return to thereplyChannel
in headers.All the info you can find in the Reference Manual:
https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/messaging-routing-chapter.html#aggregator
https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/configuration.html#namespace-errorhandler