Intercept incoming message of Spring Cloud Stream SubscribableChannel

2.9k views Asked by At

Since method postReceive of org.springframework.messaging.support.ChannelInterceptor is not invoked in org.springframework.messaging.SubscribableChannel. Is there any way to intercept all of incoming messages for method annotated@StreamListener(Sink.INPUT)?

For example:

Intercept the message before go into handle method

@StreamListener(Sink.INPUT)
public void handle(Foo foo) {
    // ...
}

Below is my setup for Spring Cloud Stream

public interface EventSink {

    String INPUT1 = "input1";
    String INPUT2 = "input2";

    @Input(INPUT1)
    SubscribableChannel input1();

    @Input(INPUT2)
    SubscribableChannel input2();   
}

public interface EventSource {

    String OUTPUT1 = "output1";
    String OUTPUT2 = "output2";

    @Output(OUTPUT1)
    MessageChannel output1();

    @Output(OUTPUT2)
    MessageChannel output2()';
}

spring:
  cloud:
    stream:
      bindings:
        input1:
          destination: input1
        input2:
          destination: input2     
        output1:
          destination: output1
        output2:
          destination: output2

public class EventHandler {

    @StreamListener(EventSink.INPUT1)
    public void handle(Foo1 foo) {
        // ...
    }

    @StreamListener(EventSink.INPUT2)
    public void handle(Foo2 foo) {
        // ...
    }

}

@Service
public class Bar1Service {

    @Autowired
    private EventSource source;

    public void bar1() {
        source.output1().send(MessageBuilder.withPayload("bar1").build());
    }

}

@Service
public class Bar2Service {

    @Autowired
    private EventSource source;

    public void bar2() {
        source.output2().send(MessageBuilder.withPayload("bar2").build());
    }

}
1

There are 1 answers

13
Gary Russell On BEST ANSWER

WIth a DirectChannel the binder invokes your listener on the same thread so preSend is appropriate here.

However, you don't have to mess with ThreadLocal, you can access headers using the method signature...

@StreamListener(Processor.INPUT)
public void handle(Foo foo, @Header("bar") String bar) {
    ...
}

EDIT

@EnableBinding(Processor.class)
public class So41459187Application {

    public static void main(String[] args) {
        SpringApplication.run(So41459187Application.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String handle(String in) {
        return in.toUpperCase();
    }

    @Configuration
    public static class Config {

        @Bean
        public BeanPostProcessor channelConfigurer() {
            return new BeanPostProcessor() {

                @Override
                public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                    return bean;
                }

                @Override
                public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
                    if ("input".equals(beanName)) {
                        ((AbstractMessageChannel) bean).addInterceptor(new ChannelInterceptorAdapter() {

                            @Override
                            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                                System.out.println("PreSend on INPUT: " + message);
                                return message;
                            }

                        });
                    }
                    else if ("output".equals(beanName)) {
                        ((AbstractMessageChannel) bean).addInterceptor(new ChannelInterceptorAdapter() {

                            @Override
                            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                                System.out.println("PreSend on OUTPUT: " + message);
                                return message;
                            }

                        });
                    }
                    return bean;
                }

            };
        }
    }

}