I have a Kafka cluster and a spring boot application that is configured for EOS. The application consumes from topic A performs some business logic then produces to topic B. The issue i am facing if EOS fails to write to topic B it retries and all my business logic is executed again. This is a problem for me as I then duplicate the API call. Is there a flag of some sort when a retry occurs so i can then skip the business logic and go straight to produce?
KafkaConsumerConfig
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.server}")
String server;
@Value("${kafka.consumer.groupid}")
String groupid;
@Autowired
Tracer tracer;
@Bean
public ConsumerFactory<String, TransactionAvroEntity> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new AvroDeserializer<>(TransactionAvroEntity.class)), tracer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, TransactionAvroEntity> kafkaListenerContainerFactory(
KafkaAwareTransactionManager<Object, Object> transactionManager) {
ConcurrentKafkaListenerContainerFactory<String, TransactionAvroEntity> factory = new ConcurrentKafkaListenerContainerFactory<String, TransactionAvroEntity>();
factory.setConsumerFactory(consumerFactory());
factory.setAutoStartup(false);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(AckMode.BATCH);
factory.getContainerProperties().setEosMode(EOSMode.ALPHA);
factory.getContainerProperties().setTransactionManager(transactionManager);
factory.setConcurrency(5);
return factory;
}
}
KafkaProducerConfig
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.server}")
String server;
@Autowired
public Tracer tracer;
@Bean
public ProducerFactory<String, TransactionAvroEntity> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class.getName());
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranDec-1");
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
config.put(ProducerConfig.LINGER_MS_CONFIG, "200");
config.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(256*1024));
config.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,120000);
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,60000);
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(32768* 1024));
config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.EXAMPLE.config.KafkaCustomPatitioner");
return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(config), tracer);
}
@Bean
public KafkaTemplate<String, TransactionAvroEntity> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaAwareTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<String, TransactionAvroEntity> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
}
KafkaTopicProducer
@Service
public class KafkaTopicProducer {
@Autowired
private KafkaTemplate<String, TransactionAvroEntity> kafkaTemplate;
public void topicProducer(TransactionAvroEntity payload, String topic, Headers headers) {
ProducerRecord<String, TransactionAvroEntity> producerRecord = new ProducerRecord<String, TransactionAvroEntity>(topic, null, payload.getNumber(), payload,
headers);
kafkaTemplate.send(producerRecord);
}
}
KafkaConsumer
@Service
public class KafkaConsumerTransaction {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumerTransaction.class);
@Autowired
TransactionEnrichmentService enrichmentService;
@Autowired
KafkaTopicProducer kafkaTopicProducer;
@Value("${kafka.producer.topic.transactionDecouplingException}")
String exceptionTopic;
@Autowired
JaegerTrace tracer;
@Autowired
ObjectMapper objectMapper;
@KafkaListener(topics = "${kafka.consumer.topic.transaction}", groupId = "${kafka.consumer.groupid}", id = "${kafka.consumer.listenerid}")
public boolean consume(List<ConsumerRecord<String, TransactionAvroEntity>> records, Consumer<?, ?> consumer) {
// loop through batch read
for (ConsumerRecord<String, TransactionAvroEntity> record : records) {
Integer partition = record.partition();
Long offset = record.offset();
TransactionAvroEntity te = record.value();
try {
if (enrichmentService.enrichAndValidateTransactions(te, partition, offset, record)) {
//Do some logic
} else {
//Do some logic
}
} catch (Exception e) {
//Do some logic
}
kafkaTopicProducer.topicProducer(t, topic, record.headers());
}
return true;
}
}
EOS applies to the entire
consume->process->publish
only.The
consume->process
part is at least once.You need to make the process part idempotent.
A common technique is to store the topic/partition/offset along with other data in the
process
so you can check whether this record has already been processed.You can enable the delivery attempt header, but that will only tell you that this is a redelivery, it can't tell you whether the failure was at the start of the
process
or at the end (during the send).