I'm trying to run some integration tests for Kafka consumer with,
org.springframework.kafka.test.context.EmbeddedKafka
Currently letting spring-boot-starter-parent to do the dependency version management responsibility. Here is the pom.xml file.
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Here is the Kafka consumer code,
@Configuration
public class KafkaEventListener {
@RetryableTopic(
attempts = "#{'${kafka.max.retry.attempts}'}",
autoCreateTopics = "#{'${kafka.auto.create.retry.topics}'}",
backoff = @Backoff(
delayExpression = "#{'${kafka.retry.init-interval}'}",
multiplierExpression = "#{'${kafka.retry.backoff.multiplier}'}"),
include = { Exception.class },
timeout = "#{'${kafka.max.retry.duration}'}",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
dltStrategy = DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "${kafka.topic.test}", groupId = "${kafka.group.id.test}",
containerFactory = "testKafkaListenerContainerFactory")
public void listen(@Payload MetadataMessage input, @Header(KafkaHeaders.OFFSET) String offset) {
System.out.println(input.getValue());
}
@DltHandler
public void deadLetterHandler(@Payload(required = false) MetadataMessage data,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
System.out.println(String.format("Event from topic %s has been dead-lettered. Event data : %s", topic, data.toString()));
}
}
Here is the Kafka configuration class,
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${kafka.group.id.test}")
private String groupId;
private Map<String, Object> consumerFactoryConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// Disabled kafka auto acknowledgement to gain more flexibility
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return props;
}
@Bean
public ConsumerFactory<String, MetadataMessage> metadataConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerFactoryConfigs(), new StringDeserializer(),
new ErrorHandlingDeserializer<>(new JsonDeserializer<>(MetadataMessage.class)));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MetadataMessage> testKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MetadataMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(metadataConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
Here is the test class,
@SpringBootTest(classes = EmbeddedKafkaApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@TestPropertySource(locations = { "classpath:application.properties" })
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:29092", "port=29092" })
public class EmbeddedKafkaTest {
@Autowired
private KafkaEventListener kafkaEventListener;
@Test
public void testKafkaEvent() {
kafkaEventListener.listen(new MetadataMessage("kafka message from test"), "1", mock(Acknowledgment.class));
}
}
With spring-boot-starter-parent version 3.1.10 the test is working. However, when I switch to a newer version of spring-boot-starter-parent, the test fails.
I can see these logs when start the application when I ran the test cases,
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.1.10)
2024-03-31T16:37:37.354+05:30 INFO 26280 --- [ main] k.utils.Log4jControllerRegistration$ : Registered kafka:type=kafka.Log4jController MBean
2024-03-31T16:37:37.448+05:30 INFO 26280 --- [ main] o.a.zookeeper.server.ZooKeeperServer :
2024-03-31T16:37:37.448+05:30 INFO 26280 --- [ main] o.a.zookeeper.server.ZooKeeperServer : ______ _
2024-03-31T16:37:37.448+05:30 INFO 26280 --- [ main] o.a.zookeeper.server.ZooKeeperServer : |___ / | |
2024-03-31T16:37:37.448+05:30 INFO 26280 --- [ main] o.a.zookeeper.server.ZooKeeperServer : / / ___ ___ | | __ ___ ___ _ __ ___ _ __
2024-03-31T16:37:37.448+05:30 INFO 26280 --- [ main] o.a.zookeeper.server.ZooKeeperServer : / / / _ \ / _ \ | |/ / / _ \ / _ \ | '_ \ / _ \ | '__|
2024-03-31T16:37:37.448+05:30 INFO 26280 --- [ main] o.a.zookeeper.server.ZooKeeperServer : / /__ | (_) | | (_) | | < | __/ | __/ | |_) | | __/ | |
2024-03-31T16:37:37.448+05:30 INFO 26280 --- [ main] o.a.zookeeper.server.ZooKeeperServer : /_____| \___/ \___/ |_|\_\ \___| \___| | .__/ \___| |_|
2024-03-31T16:37:37.448+05:30 INFO 26280 --- [ main] o.a.zookeeper.server.ZooKeeperServer : | |
2024-03-31T16:37:37.448+05:30 INFO 26280 --- [ main] o.a.zookeeper.server.ZooKeeperServer : |_|
2024-03-31T16:37:37.448+05:30 INFO 26280 --- [ main] o.a.zookeeper.server.ZooKeeperServer :
2024-03-31T16:37:37.460+05:30 INFO 26280 --- [ main] o.a.zookeeper.server.ZooKeeperServer : Server environment:zookeeper.version=3.6.4--d65253dcf68e9097c6e95a126463fd5fdeb4521c, built on 12/18/2022 18:10 GMT
2024-03-31T16:37:37.460+05:30 INFO 26280 --- [ main] o.a.zookeeper.server.ZooKeeperServer : Server environment:host.name=SADEEP-M.Zone24x7.lk
2024-03-31T16:37:37.460+05:30 INFO 26280 --- [ main] o.a.zookeeper.server.ZooKeeperServer : Server environment:java.version=17.0.8.1
2024-03-31T16:37:37.460+05:30 INFO 26280 --- [ main] o.a.zookeeper.server.ZooKeeperServer : Server environment:java.vendor=Amazon.com Inc.
Final log lines of success execution with 3.1.10 version,
2024-03-31T16:37:41.551+05:30 INFO 26280 --- [ner#0-dlt-0-C-1] o.s.k.l.KafkaMessageListenerContainer : testGroupId-dlt: partitions assigned: [testTopic-dlt-0]
2024-03-31T16:37:41.551+05:30 INFO 26280 --- [0-retry-1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : testGroupId-retry-1: partitions assigned: [testTopic-retry-1-0]
2024-03-31T16:37:41.551+05:30 INFO 26280 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : testGroupId: partitions assigned: [testTopic-0]
2024-03-31T16:37:41.551+05:30 INFO 26280 --- [0-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : testGroupId-retry-0: partitions assigned: [testTopic-retry-0-0]
kafka message from test
I can see the ZooKeeperServer is running clearly.
But after I changed to the newer version such as 3.2.0 or any latest versions (3.2.4). I cannot see that ZooKeeperServer logs and test also is falling to execute. Here are some logs from that,
2024-03-31T16:46:29.072+05:30 INFO 25024 --- [embedded-kafka] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1
2024-03-31T16:46:29.072+05:30 INFO 25024 --- [embedded-kafka] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5
2024-03-31T16:46:29.072+05:30 INFO 25024 --- [embedded-kafka] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1711883789072
2024-03-31T16:46:29.075+05:30 INFO 25024 --- [embedded-kafka] [| adminclient-2] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-2] Node -1 disconnected.
2024-03-31T16:46:29.075+05:30 WARN 25024 --- [embedded-kafka] [| adminclient-2] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2024-03-31T16:46:29.191+05:30 INFO 25024 --- [embedded-kafka] [| adminclient-2] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-2] Node -1 disconnected.
2024-03-31T16:46:29.191+05:30 WARN 25024 --- [embedded-kafka] [| adminclient-2] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2024-03-31T16:46:29.300+05:30 INFO 25024 --- [embedded-kafka] [| adminclient-2] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-2] Node -1 disconnected.
2024-03-31T16:46:29.300+05:30 WARN 25024 --- [embedded-kafka] [| adminclient-2] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2024-03-31T16:46:29.518+05:30 INFO 25024 --- [embedded-kafka] [| adminclient-2] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-2] Node -1 disconnected.
That Zookeeper logs are also not visible,
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.2.4)
2024-03-31T16:46:26.369+05:30 INFO 25024 --- [embedded-kafka] [ main] k.utils.Log4jControllerRegistration$ : Registered kafka:type=kafka.Log4jController MBean
2024-03-31T16:46:26.387+05:30 INFO 25024 --- [embedded-kafka] [ main] org.apache.zookeeper.common.X509Util : Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation
2024-03-31T16:46:26.501+05:30 INFO 25024 --- [embedded-kafka] [-kit-executor-1] kafka.server.ControllerServer : Formatting C:\Users\sandeepm\AppData\Local\Temp\kafka-16755125128047995461\controller_0 with metadata.version 3.3-IV0.
2024-03-31T16:46:26.503+05:30 INFO 25024 --- [embedded-kafka] [-kit-executor-3] kafka.server.BrokerServer : [BrokerServer id=0] Transition from SHUTDOWN to STARTING
2024-03-31T16:46:26.503+05:30 INFO 25024 --- [embedded-kafka] [-kit-executor-2] kafka.server.ControllerServer : [ControllerServer id=0] Starting controller
2024-03-31T16:46:26.504+05:30 INFO 25024 --- [embedded-kafka] [-kit-executor-3] kafka.server.SharedServer : [SharedServer id=0] Starting SharedServer
2024-03-31T16:46:26.524+05:30 INFO 25024 --- [embedded-kafka] [-kit-executor-2] o.a.k.s.network.EndpointReadyFutures : authorizerStart completed for endpoint CONTROLLER. Endpoint is now READY.
2024-03-31T16:46:26.596+05:30 INFO 25024 --- [embedded-kafka] [-kit-executor-3] kafka.log.UnifiedLog$ : [LogLoader partition=__cluster_metadata-0, dir=C:\Users\sandeepm\AppData\Local\Temp\kafka-16755125128047995461\controller_0] Loading producer state till offset 0 with message format version 2
2024-03-31T16:46:26.597+05:30 INFO 25024 --- [embedded-kafka] [-kit-executor-3] kafka.log.UnifiedLog$ : [LogLoader partition=__cluster_metadata-0, dir=C:\Users\sandeepm\AppData\Local\Temp\kafka-16755125128047995461\controller_0] Reloading from producer snapshot and rebuilding producer state from offset 0
2024-03-31T16:46:26.597+05:30 INFO 25024 --- [embedded-kafka] [-kit-executor-3] kafka.log.UnifiedLog$ : [LogLoader partition=__cluster_metadata-0, dir=C:\Users\sandeepm\AppData\Local\Temp\kafka-16755125128047995461\controller_0] Producer state recovery took 0ms for snapshot load and 0ms for segment recovery from offset 0
2024-03-31T16:46:26.634+05:30 INFO 25024 --- [embedded-kafka] [-kit-executor-3] kafka.raft.KafkaMetadataLog$ : Initialized snapshots with IDs SortedSet() from C:\Users\sandeepm\AppData\Local\Temp\kafka-16755125128047995461\controller_0\__cluster_metadata-0
2024-03-31T16:46:26.671+05:30 INFO 25024 --- [embedded-kafka] [piration-reaper] ExpirationService$ExpiredOperationReaper : [raft-expiration-reaper]: Starting
2024-03-31T16:46:26.716+05:30 INFO 25024 --- [embedded-kafka] [-kit-executor-3] org.apache.kafka.raft.QuorumState : [RaftManager id=0] Completed transition to Unattached(epoch=0, voters=[0], electionTimeoutMs=1955) from null
2024-03-31T16:46:26.722+05:30 INFO 25024 --- [embedded-kafka] [-kit-executor-3] org.apache.kafka.raft.QuorumState : [RaftManager id=0] Completed transition to CandidateState(localId=0, epoch=1, retries=1, voteStates={0=GRANTED}, highWatermark=Optional.empty, electionTimeoutMs=1252) from Unattached(epoch=0, voters=[0], electionTimeoutMs=1955)
2024-03-31T16:46:26.727+05:30 INFO 25024 --- [embedded-kafka] [-kit-executor-3] org.apache.kafka.raft.QuorumState : [RaftManager id=0] Completed transition to Leader(localId=0, epoch=1, epochStartOffset=0, highWatermark=Optional.empty, voterStates={0=ReplicaState(nodeId=0, endOffset=Optional.empty, lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)}) from CandidateState(localId=0, epoch=1, retries=1, voteStates={0=GRANTED}, highWatermark=Optional.empty, electionTimeoutMs=1252)
2024-03-31T16:46:26.800+05:30 INFO 25024 --- [embedded-kafka] [-kit-executor-2] kafka.network.ConnectionQuotas : Updated connection-accept-rate max connection creation rate to 2147483647
I'm having trouble with the dependency version and need some assistance. I want to run all my test cases using an embedded Kafka broker without using an external Kafka cluster. Could you please help me with this? Thank you!