StreamRetryTemplate for Spring Cloud Streams not retrying in integration tests

1.8k views Asked by At

We are utilizing Spring Cloud Streams that listen to a Kafka topic and call a rest service. We also implement a custom StreamRetryTemplate to specify what kind of errors we deem recoverable and which we do not. I cannot get consistent results between how it works at runtime and how it works in integration tests.

I've verified in debug mode that the exception is being thrown properly and that the RetryTemplate is being injected in properly but it just doesn't seem to be utilized in my integration tests.

@EnableBinding(Sink::class)
class MyListener(private val myService: Service) {

  @StreamListener(Sink.Input)
  fun consume(@Payload msg: MyMessage) = myService.process(msg)

  @SteamRetryTemplate
  fun getRetryTemplate() = RetryTemplate()
}

When I run this app and myService throws an exception I expect it to be retried, and it does so perfectly. But when I write integration tests with a wiremock server and have myService throw an exception it does not retry. I have assert statements to verify how many times my wiremock endpoint is hit.

Am I missing something specifically for retries to work in integration tests?

1

There are 1 answers

6
Gary Russell On BEST ANSWER

Are you using the test binder or the embedded kafka broker? The test binder is rather limited; using the embedded broker is preferred for full integration testing.

See Testing Applications in the Spring for Apache Kafka Documentation.

EDIT

@SpringBootApplication
@EnableBinding(Sink.class)
public class So55855151Application {

    public static void main(String[] args) {
        SpringApplication.run(So55855151Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("fail");
    }

    @StreamRetryTemplate
    public RetryTemplate retrier() {
        return new RetryTemplate();
    }

}
spring.cloud.stream.bindings.input.group=input
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka
public class So55855151ApplicationTests {

    @Autowired
    private KafkaTemplate<byte[], byte[]> template;

    @Autowired
    private RetryTemplate retrier;

    @Test
    public void test() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        this.retrier.registerListener(new RetryListener() {

            @Override
            public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
                System.out.println("open");
                latch.countDown();
                return true;
            }

            @Override
            public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
                    Throwable throwable) {

                System.out.println("close");
                latch.countDown();
            }

            @Override
            public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
                    Throwable throwable) {

                System.out.println("onError: " + throwable);
                latch.countDown();
            }

        });

        this.template.send("input", "test".getBytes());
        assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
    }

}