I am new to Spring and Kafka and trying to learn the same by creating some small projects.
I have created a Producer-Consumer Application using Spring Cloud Stream and Apache Kafka, but I cannot figure out how I can test whether the data has been pushed to the topic or whether my Consumer is able to receive the message or not.
I have been searching for the same everywhere, but could not find any source that seemed like a solution to my problem.
Can someone please let me know how can I test my producer and consumer.
Here is my code:
import brave.ScopedSpan;
import brave.Span;
import brave.Tracer;
import brave.propagation.TraceContext;
import brave.propagation.TraceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;
@SpringBootApplication
public class DemoApplication {
Logger log= LoggerFactory.getLogger(DemoApplication.class);
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
// Supplier
@Bean
public Supplier<String> supplierBinding1(Tracer tracer) {
return () -> {
try {
Thread.sleep(1500);
}
catch (InterruptedException e){
throw new RuntimeException(e);
}
String msg= "Hello";
return msg;
};
}
// Consumer
@Bean public Consumer<String> consumerBinding1() {
return msg -> {
log.info("Message received: "+ msg);
};
}
}
Here is the application.yml file:
spring:
application:
name: "producer_consumer"
cloud:
function:
definition: consumerBinding1;supplierBinding1
stream:
bindings:
supplierBinding1-out-0:
destination: supplier-topic
consumerBinding1-in-0:
destination: supplier-topic
Here is my test file:
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
import java.util.*;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
class AstMetricsApplicationTests {
Logger log= LoggerFactory.getLogger(AstMetricsApplicationTests.class);
@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@Autowired
KafkaTemplate kafkaTemplate;
// Initializing Bootstrap server
@DynamicPropertySource
public static void initKafkaProperties(DynamicPropertyRegistry registry){
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
@Autowired
AppConfig app;
// Container Tests
@Test
public void containerCreated(){
await().pollInterval(Duration.ofSeconds(5)).untilAsserted( () ->{
boolean check= kafka.isCreated();
assertEquals(true, check);
} );
}
@Test
public void containerRunning(){
await().pollInterval(Duration.ofSeconds(5)).untilAsserted( () ->{
boolean check= kafka.isRunning();
assertEquals(true, check);
} );
}
// Sender Tests
@Test
public void checkSender(){
kafkaTemplate.send("supplier-topic", "1234").whenComplete( (res, err) ->{
boolean sent= false;
if (Objects.nonNull(err)){
log.info("Unable to send message");
}
else{
sent= true;
log.info("Message sent successfully");
}
assertEquals(true, sent);
} );
}
// How do I test producer and consumer
}
Can someone please help me to test my producer and consumer.
Thank you in advance.