How do I (integration) test @StreamListener with RabbitMQ?

2.6k views Asked by At

I'm trying to write an integration test for my listener spring boot app that in a test will start the app, then create a message, drop it into the output channel, wait for the message to be picked and processed.

Here's what I have for the test:

    package com.example;

    import com.netflix.discovery.converters.Auto;
    import org.junit.Before;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.test.context.ActiveProfiles;
    import org.springframework.test.context.junit4.SpringRunner;
    import org.springframework.test.web.servlet.MockMvc;
    import org.springframework.test.web.servlet.setup.MockMvcBuilders;
    import org.springframework.web.context.WebApplicationContext;

    import static org.assertj.core.api.Assertions.assertThat;
    import static org.springframework.integration.support.management.graph.LinkNode.Type.output;

    @RunWith(SpringRunner.class)
    @SpringBootTest
    @ActiveProfiles("scratch")
    public class DemoApplicationTests {

        @Component
        @EnableBinding(Source.class)
        static class TestMessageSource {

            @Autowired
            private Source source;

            public void sendMessage(String message) {
                new Thread(() -> {
                    Message<Greeting> msg = MessageBuilder.withPayload(new Greeting(message)).build();
                    source.output().send(msg);
                }).start();
            }
        }

        static class CounterFakeService implements CounterService {
            public int count = 0;
            public Greeting greeting;
            @Override
            public void recordCount(Greeting greeting) {
                count++;
                this.greeting = greeting;
            }
        }

        private CounterFakeService fakeCounterService;
        @Bean
        CounterService counterService() {
            return fakeCounterService;
        }

        @Autowired
        TestMessageSource messageSource;

        @Before
        public void before() {
            fakeCounterService = new  CounterFakeService();
    //      this.mvc = MockMvcBuilders.webAppContextSetup(this.context).build();
        }

        @Test
        public void doesProcessMessages() throws InterruptedException {
            assertThat(fakeCounterService.count).isEqualTo(0);
            messageSource.sendMessage("test");
            Thread.sleep(5000);
            assertThat(fakeCounterService.count).isEqualTo(1);
            assertThat(fakeCounterService.greeting.getMessage()).isEqualTo("test");
        }

    }

and in application-scratch.properties I have output channel bound to the input exchange

    spring.rabbitmq.host=rabbitmq.local.pcfdev.io
    spring.rabbitmq.port=5672
    spring.rabbitmq.password=i9jbk2o3ingqtkrekgm988bvui
    spring.rabbitmq.username=8cf073b0-a2ff-450a-bee5-3954cb6c191f
    spring.rabbitmq.virtual-host=5fc33451-4ec0-440e-90a1-6e7ed0c025f9
    spring.cloud.stream.bindings.output.destination=input

However, at the moment, the test fails as there's no message recieved. What should I be looking for?

EDIT - listener code for completeness

PS: I've verified that this actually works with another app sending messages to the input exchange..Just can't get the test to work :(

    @RestController
    @SpringBootApplication
    @EnableDiscoveryClient
    @EnableBinding(Sink.class)
    public class DemoApplication {
        private static Logger log = LoggerFactory.getLogger(DemoApplication.class);

        @Autowired
        private Config config;

        @Autowired
        CounterServiceImpl counterService;

        @StreamListener(Sink.INPUT)
        public void handle(Greeting greeting) {
            log.info("in handle(Greeting), {}", greeting);
            counterService.recordCount(greeting);
        }

    public static void main(String[] args) {
            SpringApplication.run(DemoApplication.class, args);
        }
    }


2

There are 2 answers

1
Raghu On BEST ANSWER

Facepalm time! :) So I had the actual listener running in the background and that was picking up all the messages due to which the test was never passing

1
Ilayaperumal Gopinathan On

I don't see any @StreamListener configuration for the CounterService with the inbound channel. You also need the binding configuration for the inbound channel to the same destination input.