Why sometimes Kafka Messaging Queue returns no message when writing Integration Test?

525 views Asked by At

I have am trying to write Integration test for Kafka Messaging in my application. When I run my tests sometimes they pass and sometimes they fail. When they fail, below line in test class fails to return any message. I am not sure why it returns message sometimes and sometimes it does not. Any help in resolving is greatly appreciated. All the configurations are in yml file.

Map<String, MessageResponse> message = messageResponseTestListener.getMessages(); 

Producer

@KafkaClient
public interface MessageRequestTestProducer {

     @Topic("${kafka.messageRequestTopic}")
     Single<RecordMetadata> sendMessageRequest(@KafkaKey String id, MessageRequest messageRequest);
} 

Consumer

@Singleton
@KafkaListener(groupId = "test", offsetReset = OffsetReset.EARLIEST)
public class MessageResponseTestListener {

    public CountDownLatch getLatch() {
        return latch;
    }

    private CountDownLatch latch = new CountDownLatch(1);

    private Map<String, MessageResponse> messages = new HashMap<>();

    @Topic("${kafka.messageResponseTopic}")
    public Single<MessageResponse> eventOccurred(@KafkaKey String key, Single<MessageResponse> recordSingle) {
        return recordSingle.doOnSuccess(record -> {
                    messages.put(key, record);
                    latch.countDown();
                }
        );

    }

    public Map<String, MessageResponse> getMessages() {
        return messages;
    }
}

KafkaBaseTest

public class KafkaTestBase implements TestPropertyProvider{

protected static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

@Override
public Map<String, String> get()
{
    return null;
}

@NonNull
@Override
public Map<String, String> getProperties()
{
    Map<String, String> propertyOverrideMap = new HashMap();
    propertyOverrideMap.put("kafka.bootstrap.servers", kafka.getBootstrapServers());

    return propertyOverrideMap;
}

static {
    kafka.start();
}
}

Test Class

@Inject
MessageRequestTestProducer messageRequestTestProducer;

@Inject
MessageResponseTestListener messageResponseTestListener;

@Test
void test() throws InterruptedException {

    TEST("Message Testing Kafka version");

    String id = "123";
    MessageRequest messageRequest = buildCorrectRequest();
    messageRequestTestProducer.sendMessageRequest(id, messageRequest).blockingGet();

    messageResponseTestListener.getLatch().await(2, TimeUnit.SECONDS);
    WHEN("Message event received");

    Map<String, MessageResponse> message = messageResponseTestListener.getMessages();

    THEN("Message is produced");

    assertTrue(message.containsKey(id));
    assertEquals(buildExpectedMessageResponse(), message.get(cdrId));

}
0

There are 0 answers