We are utilizing Spring Cloud Streams that listen to a Kafka topic and call a rest service. We also implement a custom StreamRetryTemplate to specify what kind of errors we deem recoverable and which we do not. I cannot get consistent results between how it works at runtime and how it works in integration tests.
I've verified in debug mode that the exception is being thrown properly and that the RetryTemplate is being injected in properly but it just doesn't seem to be utilized in my integration tests.
@EnableBinding(Sink::class)
class MyListener(private val myService: Service) {
@StreamListener(Sink.Input)
fun consume(@Payload msg: MyMessage) = myService.process(msg)
@SteamRetryTemplate
fun getRetryTemplate() = RetryTemplate()
}
When I run this app and myService throws an exception I expect it to be retried, and it does so perfectly. But when I write integration tests with a wiremock server and have myService throw an exception it does not retry. I have assert statements to verify how many times my wiremock endpoint is hit.
Am I missing something specifically for retries to work in integration tests?
Are you using the test binder or the embedded kafka broker? The test binder is rather limited; using the embedded broker is preferred for full integration testing.
See Testing Applications in the Spring for Apache Kafka Documentation.
EDIT