I have two threads and each of the method in the thread throws an exception. How do I get all the errors that's thrown in each of the thread? In this code, the error channel just catch one of the errors. Basically my goal is to catch all the errors and send them to the caller (a rest controller). Any help will be appreciated. Thanks.

Integration.java

public IntegrationFlow provisionUserFlow() {
return IntegrationFlows.from("input.channel")
  .publishSubscribeChannel(Executors.newCachedThreadPool(),
      s -> s
            .subscribe(f -> f.handle(provisionerA, "provision"))
            .subscribe(f -> f.handle(provisionerB, "provision"))
  .get();
}

@ServiceActivator( inputChannel = "errorChannel", outputChannel = "replyChannel")

public boolean processErrors(Exception message) throws RuntimeException{

System.out.println("Message" + message.getMessage());
System.out.println ("******************************");

throw new RuntimeException(message.getMessage());
}

MGateway.java

@MessagingGateway(errorChannel = "errorChannel")
public interface MGateway {

@Gateway(requestChannel = "input.channel", replyChannel = "replyChannel") 
boolean invokeProvisioner(User user);
}

SOLUTION

@Bean
public IntegrationFlow provisionUserFlow() {
return
    IntegrationFlows.from("input.channel")
    .publishSubscribeChannel(Executors.newCachedThreadPool(),
        s -> s.applySequence(true)
            .subscribe(f -> f.enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
                .handle(provisionerA, "provision")
                .channel("aggregatorChannel")
            )
            .subscribe(f -> f.enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
                .handle(provisionerB, "provision")
                .channel("aggregatorChannel"))
            )
        .get();
}

@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlows.from("aggregatorChannel")
    .channel( aggregatorChannel)
    .aggregate( a -> a.processor( collect, "aggregatingMethod"))
    .get();
}

@Transformer( inputChannel = "errorChannel", outputChannel = "aggregatorChannel")
public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {

Message<?> failedMessage =  ((MessagingException) errorMessage.getPayload())
    .getFailedMessage();

Exception exception = (Exception) errorMessage.getPayload();

return  MessageBuilder.withPayload( exception.getMessage())
       .copyHeadersIfAbsent( failedMessage.getHeaders() )
       .build();
}
1

There are 1 answers

23
Artem Bilan On BEST ANSWER

You see @Gateway is just Java method. It has one return and may throw one exception. I'm still confused why people think that Spring Integration works somehow different. It is fully based on Java and does nothing magic - only calls java methods.

Now let's imaging what you'd do if you develop just with raw Java. Right, you would wait for relies from both threads and build a single return to the caller.

The same we can do with Spring Integration. Just need to use Aggregator EIP. You can catch error messages in that error channel and correlate them via their failedMessages. The .publishSubscribeChannel() can be supplied with the option:

/**
 * Specify whether to apply the sequence number and size headers to the
 * messages prior to invoking the subscribed handlers. By default, this
 * value is <code>false</code> meaning that sequence headers will
 * <em>not</em> be applied. If planning to use an Aggregator downstream
 * with the default correlation and completion strategies, you should set
 * this flag to <code>true</code>.
 * @param applySequence true if the sequence information should be applied.
 */
public void setApplySequence(boolean applySequence) {

which is false by default. The aggregator then can just rely on the default correlationStrategy and gather the group of errors for you to return to the replyChannel in headers.

All the info you can find in the Reference Manual:

https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/messaging-routing-chapter.html#aggregator

https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/configuration.html#namespace-errorhandler