Spring Integration: Splitter exception causes subsequent messages to abort

1.8k views Asked by At

I have the following configuration, where Im trying to process a list of messages that are split by a splitter. The problem Im facing is that an exception in one of the individual message processing causes all subsequent messages to NOT process. Is there something Im doing wrong?

<int:chain input-channel="exceptionTestChannel">
  <int:splitter/>
  <int:header-enricher>
    <int:error-channel ref="myErrorChannel"/>
  </int:header-enricher>
  <int:service-activator id="testExceptionService"
    ref="testExceptionService" method="throwException"></int:service-activator>
  <int:aggregator></int:aggregator>
</int:chain>

<int:channel id="myErrorChannel">
  <int:interceptors>
    <int:wire-tap channel="loggerChannel" />
  </int:interceptors>
</int:channel>

<int:channel id="exceptionTestChannel">
  <int:interceptors>
    <int:wire-tap channel="loggerChannel" />
  </int:interceptors>
</int:channel>


<int:transformer ref="errorUnwrapper" input-channel="myErrorChannel"  />

Exception is below

2014-11-12 09:56:30,076 ERROR [com.test.transform.ErrorUnwrapper] - [Payload=org.springframework.integration.MessageHandlingException: com.test.TestServiceException: System Error. Trial test exception][Headers={timestamp=1415814990076, id=a2f50a6d-4de6-3785-e8d8-a5ef9f46c27f}]
org.springframework.integration.MessageHandlingException: com.test.TestServiceException: System Error. Trial test exception
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:76)
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:67)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
    at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
    at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
    at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
    at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:167)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:131)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:178)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:149)
    at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
    at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:97)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:199)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:143)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:141)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:273)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:49)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:268)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:452)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:314)
    at java.util.concurrent.FutureTask.run(FutureTask.java:149)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:109)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:218)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:883)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:919)
    at java.lang.Thread.run(Thread.java:736)
Caused by: com.test.TestServiceException: System Error. Trial test exception
    at com.test.TestServiceException.getSystemErrorInstance(TestServiceException.java:31)
    at com.test.TestExceptionService.throwException(TestExceptionService.java:13)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:60)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:37)
    at java.lang.reflect.Method.invoke(Method.java:611)
    at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:69)
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:122)
    at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:44)
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:258)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:82)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:102)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:103)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:144)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:268)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:142)
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:73)
    ... 50 more
2014-11-12 09:56:30,076 WARN [org.springframework.integration.channel.MessagePublishingErrorHandler] - Error message was not delivered.
org.springframework.integration.support.channel.ChannelResolutionException: no output-channel or replyChannel header available
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:218)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:178)
    at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:83)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:49)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:268)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:452)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:314)
    at java.util.concurrent.FutureTask.run(FutureTask.java:149)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:109)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:218)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:883)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:919)
    at java.lang.Thread.run(Thread.java:736)
2

There are 2 answers

0
Artem Bilan On BEST ANSWER

an exception in one of the individual message processing causes all subsequent messages to NOT process

You should agree with that it is normal behaviour for any single-thread process: when exception is caused somewhere, all other code won't be invoked.

The simple split looks like for (Object o : collection).

To achieve your requirements you need to use ExecutorChannel (or QueueChannel) as an output-channel for <splitter>.

Of course, in this case you should pull that splitter outside of <chain>.

Having that threading shift your messages won't affect each other.

0
Gary Russell On

@Artem is correct but another alternative, if you want everything to run on the same thread is to insert a gateway mid-flow.

Just adding an error-channel header like that won't work...

<chain>
    <splitter />
    <service-activator ref="gw" />
<chain>

<gateway id="gw" request-channel="foo" error-channel="errors" 
      reply-timeout="0" />

<logging-channel-adapter channel="errors" /> 

<chain input-channel="foo">
    <int:service-activator id="testExceptionService"
        ref="testExceptionService" method="throwException"></int:service-activator>
    <int:aggregator></int:aggregator>
</chain>

However, dropping splits like that will cause the default aggregator to never complete the group so you need a custom release strategy that will release the group even with missing splits.