I have am trying to write Integration test for Kafka Messaging in my application. When I run my tests sometimes they pass and sometimes they fail. When they fail, below line in test class fails to return any message. I am not sure why it returns message sometimes and sometimes it does not. Any help in resolving is greatly appreciated. All the configurations are in yml file.
Map<String, MessageResponse> message = messageResponseTestListener.getMessages();
Producer
@KafkaClient
public interface MessageRequestTestProducer {
@Topic("${kafka.messageRequestTopic}")
Single<RecordMetadata> sendMessageRequest(@KafkaKey String id, MessageRequest messageRequest);
}
Consumer
@Singleton
@KafkaListener(groupId = "test", offsetReset = OffsetReset.EARLIEST)
public class MessageResponseTestListener {
public CountDownLatch getLatch() {
return latch;
}
private CountDownLatch latch = new CountDownLatch(1);
private Map<String, MessageResponse> messages = new HashMap<>();
@Topic("${kafka.messageResponseTopic}")
public Single<MessageResponse> eventOccurred(@KafkaKey String key, Single<MessageResponse> recordSingle) {
return recordSingle.doOnSuccess(record -> {
messages.put(key, record);
latch.countDown();
}
);
}
public Map<String, MessageResponse> getMessages() {
return messages;
}
}
KafkaBaseTest
public class KafkaTestBase implements TestPropertyProvider{
protected static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
@Override
public Map<String, String> get()
{
return null;
}
@NonNull
@Override
public Map<String, String> getProperties()
{
Map<String, String> propertyOverrideMap = new HashMap();
propertyOverrideMap.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
return propertyOverrideMap;
}
static {
kafka.start();
}
}
Test Class
@Inject
MessageRequestTestProducer messageRequestTestProducer;
@Inject
MessageResponseTestListener messageResponseTestListener;
@Test
void test() throws InterruptedException {
TEST("Message Testing Kafka version");
String id = "123";
MessageRequest messageRequest = buildCorrectRequest();
messageRequestTestProducer.sendMessageRequest(id, messageRequest).blockingGet();
messageResponseTestListener.getLatch().await(2, TimeUnit.SECONDS);
WHEN("Message event received");
Map<String, MessageResponse> message = messageResponseTestListener.getMessages();
THEN("Message is produced");
assertTrue(message.containsKey(id));
assertEquals(buildExpectedMessageResponse(), message.get(cdrId));
}