- I have setup a Apache kafka cluster with three hosts.
- I have created a topic with replication factor 3 and partition count 8.
- I have divided my data equally among the 8 partitions.
- I have 8 single threaded consumers and i am leaving kafka to do partition assignment to each consumer.
- I am using enable auto commit as false and AckMode as RECORD.
PROBLEM:
When I stop and start the consumer. Duplicate message count is in hundreds. I am expecting a count not more than 8 since i have 8 consumers and AckMode is RECORD.
Please help!!!
This is my consumer config.
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.support.serializer.JsonDeserializer;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${kafka.market.persist.consumergroup}")
private String marketDataPersistConsumer;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Quote> marketDataKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Quote> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(marketDataPersistConsumer));
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
@Bean
public ConsumerFactory<String, Quote> consumerFactory(String groupId) {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(groupId), new StringDeserializer(), new JsonDeserializer<>(Quote.class));
}
@Bean
public Map<String, Object> consumerConfigs(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
@Bean
public MessageListener messageListener() {
return new MessageListener();
}
}
and this is my Message Listner:
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class MessageListener {
private static final Logger LOG = Logger.getLogger("debugLogger");
private void persistQuote(Quote quote, int partition) {
save(quote);
}
@KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
public void marketDataPersistConsumerZero(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
persistQuote(quote, partition);
LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
}
@KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
public void marketDataPersistConsumerOne(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
persistQuote(quote, partition);
LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
}
@KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
public void marketDataPersistConsumerTwo(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
persistQuote(quote, partition);
LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
}
@KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
public void marketDataPersistConsumerThree(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
persistQuote(quote, partition);
LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
}
@KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
public void marketDataPersistConsumerFour(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
persistQuote(quote, partition);
LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
}
@KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
public void marketDataPersistConsumerFive(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
persistQuote(quote, partition);
LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
}
@KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
public void marketDataPersistConsumerSix(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
persistQuote(quote, partition);
LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
}
@KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
public void marketDataPersistConsumerSeven(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
persistQuote(quote, partition);
LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
}
}
Kakfa Debug Logs
11:32:26 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit > list: {} 11:32:27 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - > > Committing: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15649, metadata=''}, market-data-topic-dev3-3=OffsetAndMetadata{offset=15408, metadata=''}} 11:32:27 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15648, metadata=''}, market-data-topic-dev3-1=OffsetAndMetadata{offset=15269, metadata=''}} 11:32:27 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15631, metadata=''}, market-data-topic-dev3-5=OffsetAndMetadata{offset=15515, metadata=''}} 11:32:27 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15657, metadata=''}, market-data-topic-dev3-7=OffsetAndMetadata{offset=15631, metadata=''}} 11:32:27 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-0, market-data-topic-dev3-1] 11:32:27 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-2, market-data-topic-dev3-3] 11:32:27 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-4, market-data-topic-dev3-5] 11:32:27 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-6, market-data-topic-dev3-7] 11:32:27 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:27 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {} 11:32:29 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:15534 - Partition:6, lastPrice=6, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=15534, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15657, kafka_receivedMessageKey=null, kafka_receivedPartitionId=6, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15775, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15775, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15755, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15755, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15783, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15783, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15776, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15776, metadata=''}} Aug 30, 2017 11:32:39 AM org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinPrepare INFO: Revoking previously assigned partitions [market-data-topic-dev3-2, market-data-topic-dev3-3] for group market-data-persist-cg-dev3 2017-08-30 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:1063 - Stopping invoker Aug 30, 2017 11:32:39 AM org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinPrepare INFO: Revoking previously assigned partitions [market-data-topic-dev3-4, market-data-topic-dev3-5] for group market-data-persist-cg-dev3 2017-08-30 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:1063 - Stopping invoker 2017-08-30 11:32:39 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:16440 - Partition:0, lastPrice=0, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=16440, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15777, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=market-data-topic-dev3}]] Aug 30, 2017 11:32:39 AM org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinPrepare INFO: Revoking previously assigned partitions [market-data-topic-dev3-6, market-data-topic-dev3-7] for group market-data-persist-cg-dev3 2017-08-30 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:1063 - Stopping invoker 2017-08-30 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:1082 - Invoker stopped 2017-08-30 11:32:39 INFO KafkaMessageListenerContainer:242 - partitions revoked:[market-data-topic-dev3-6, market-data-topic-dev3-7] 2017-08-30 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15784, metadata=''}} Aug 30, 2017 11:32:39 AM org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinPrepare INFO: Revoking previously assigned partitions [market-data-topic-dev3-0, market-data-topic-dev3-1] for group market-data-persist-cg-dev3 2017-08-30 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15784, metadata=''}} 2017-08-30 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:1063 - Stopping invoker Aug 30, 2017 11:32:39 AM org.apache.kafka.clients.consumer.internals.AbstractCoordinator sendJoinGroupRequest INFO: (Re-)joining group market-data-persist-cg-dev3 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:1082 - Invoker stopped 11:32:39 INFO KafkaMessageListenerContainer:242 - partitions revoked:[market-data-topic-dev3-2, market-data-topic-dev3-3] 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15777, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15777, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:1082 - Invoker stopped 11:32:39 INFO KafkaMessageListenerContainer:242 - partitions revoked:[market-data-topic-dev3-4, market-data-topic-dev3-5] 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15757, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15757, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:1082 - Invoker stopped 11:32:39 INFO KafkaMessageListenerContainer:242 - partitions revoked:[market-data-topic-dev3-0, market-data-topic-dev3-1] 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15778, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15778, metadata=''}} Aug 30, 2017 11:32:39 AM org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinComplete INFO: Setting newly assigned partitions [market-data-topic-dev3-2] for group market-data-persist-cg-dev3 Aug 30, 2017 11:32:39 AM org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinComplete INFO: Setting newly assigned partitions [market-data-topic-dev3-6] for group market-data-persist-cg-dev3 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-1=OffsetAndMetadata{offset=15269, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15778, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-3=OffsetAndMetadata{offset=15408, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-7=OffsetAndMetadata{offset=15631, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15777, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15757, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15784, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-5=OffsetAndMetadata{offset=15515, metadata=''}} 11:32:39 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-1] 11:32:39 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-0] 11:32:39 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-3] 11:32:39 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-7] 11:32:39 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-4] 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {} 11:32:39 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-6] 11:32:39 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-2] 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {} 11:32:39 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-5] 11:32:39 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:16448 - Partition:0, lastPrice=0, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=16448, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15778, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 500 records 11:32:39 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:16324 - Partition:4, lastPrice=4, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=16324, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15757, kafka_receivedMessageKey=null, kafka_receivedPartitionId=4, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15909, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15909, metadata=''}} 11:32:49 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:17474 - Partition:2, lastPrice=2, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=17474, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15907, kafka_receivedMessageKey=null, kafka_receivedPartitionId=2, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15883, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15883, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15907, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15907, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15914, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15914, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-5=OffsetAndMetadata{offset=15645, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-5=OffsetAndMetadata{offset=15645, metadata=''}} 11:32:49 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:17598 - Partition:6, lastPrice=6, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=17598, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15915, kafka_receivedMessageKey=null, kafka_receivedPartitionId=6, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-1=OffsetAndMetadata{offset=15395, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-1=OffsetAndMetadata{offset=15395, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-3=OffsetAndMetadata{offset=15536, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-3=OffsetAndMetadata{offset=15536, metadata=''}} 11:32:49 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:17482 - Partition:2, lastPrice=2, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=17482, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMess 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-7=OffsetAndMetadata{offset=15774, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-7=OffsetAndMetadata{offset=15774, metadata=''}} 11:32:50 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:13489 - Partition:1, lastPrice=1, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=13489, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15408, kafka_receivedMessageKey=null, kafka_receivedPartitionId=1, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15922, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15922, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15896, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15896, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15920, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15920, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15928, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15928, metadata=''}} 11:32:50 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:17608 - Partition:0, lastPrice=0, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=17608, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15923, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-5=OffsetAndMetadata{offset=15659, metadata=''}} 11:32:50 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:14659 - Partition:3, lastPrice=3, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=14659, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15550, kafka_receivedMessageKey=null, kafka_receivedPartitionId=3, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-5=OffsetAndMetadata{offset=15659, metadata=''}} 11:32:50 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:15557 - Partition:5, lastPrice=5, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=15557, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15660, kafka_receivedMessageKey=null, kafka_receivedPartitionId=5, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-3=OffsetAndMetadata{offset=15550, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-3=OffsetAndMetadata{offset=15550, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-1=OffsetAndMetadata{offset=15408, metadata=''}} 11:32:50 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:16559 - Partition:7, lastPrice=7, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=16559, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15781, kafka_receivedMessageKey=null, kafka_receivedPartitionId=7, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-1=OffsetAndMetadata{offset=15408, metadata=''}}