Kafka EOS retry flag

373 views Asked by At

I have a Kafka cluster and a spring boot application that is configured for EOS. The application consumes from topic A performs some business logic then produces to topic B. The issue i am facing if EOS fails to write to topic B it retries and all my business logic is executed again. This is a problem for me as I then duplicate the API call. Is there a flag of some sort when a retry occurs so i can then skip the business logic and go straight to produce?

KafkaConsumerConfig

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${kafka.server}")
    String server;

    @Value("${kafka.consumer.groupid}")
    String groupid;

    @Autowired
    Tracer tracer;

    @Bean
    public ConsumerFactory<String, TransactionAvroEntity> consumerFactory() {

        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);

        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
        config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000);
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);

        return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
                new AvroDeserializer<>(TransactionAvroEntity.class)), tracer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, TransactionAvroEntity> kafkaListenerContainerFactory(
            KafkaAwareTransactionManager<Object, Object> transactionManager) {

        ConcurrentKafkaListenerContainerFactory<String, TransactionAvroEntity> factory = new ConcurrentKafkaListenerContainerFactory<String, TransactionAvroEntity>();

        factory.setConsumerFactory(consumerFactory());
        factory.setAutoStartup(false);
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(AckMode.BATCH);
        factory.getContainerProperties().setEosMode(EOSMode.ALPHA);
        factory.getContainerProperties().setTransactionManager(transactionManager);
        factory.setConcurrency(5);

        return factory;
    }

}

KafkaProducerConfig

@Configuration
public class KafkaProducerConfig {

    @Value("${kafka.server}")
    String server;
        
    @Autowired
    public Tracer tracer;
    
    @Bean
    public ProducerFactory<String, TransactionAvroEntity> producerFactory() {
        
        Map<String, Object> config = new HashMap<>();
        
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class.getName());       
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranDec-1");
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
        config.put(ProducerConfig.LINGER_MS_CONFIG, "200");
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(256*1024));
        config.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,120000);
        config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,60000);
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(32768* 1024));
        config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.EXAMPLE.config.KafkaCustomPatitioner");
        
        return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(config), tracer);
    }

    @Bean
    public KafkaTemplate<String, TransactionAvroEntity> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    
    @Bean
    public KafkaAwareTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<String, TransactionAvroEntity> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }
     

}

KafkaTopicProducer

@Service
public class KafkaTopicProducer {
    @Autowired
    private KafkaTemplate<String, TransactionAvroEntity> kafkaTemplate;

    public void topicProducer(TransactionAvroEntity payload, String topic, Headers headers) {

        ProducerRecord<String, TransactionAvroEntity> producerRecord = new ProducerRecord<String, TransactionAvroEntity>(topic, null, payload.getNumber(), payload,
                headers);
        kafkaTemplate.send(producerRecord);

    }
}

KafkaConsumer

@Service
public class KafkaConsumerTransaction {

    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerTransaction.class);

    @Autowired
    TransactionEnrichmentService enrichmentService;

    @Autowired
    KafkaTopicProducer kafkaTopicProducer;

    @Value("${kafka.producer.topic.transactionDecouplingException}")
    String exceptionTopic;

    @Autowired
    JaegerTrace tracer;

    @Autowired
    ObjectMapper objectMapper;



    @KafkaListener(topics = "${kafka.consumer.topic.transaction}", groupId = "${kafka.consumer.groupid}", id = "${kafka.consumer.listenerid}")
    public boolean consume(List<ConsumerRecord<String, TransactionAvroEntity>> records, Consumer<?, ?> consumer) {
        // loop through batch read
        for (ConsumerRecord<String, TransactionAvroEntity> record : records) {

            Integer partition = record.partition();
            Long offset = record.offset();
            TransactionAvroEntity te = record.value();

            try {

                

if (enrichmentService.enrichAndValidateTransactions(te, partition, offset, record)) {
    
//Do some logic
                    } else {
//Do some logic
                    }
    
                } catch (Exception e) {
    //Do some logic

                }
kafkaTopicProducer.topicProducer(t, topic, record.headers());
            }
            return true;
        }
    
    }
1

There are 1 answers

0
Gary Russell On

EOS applies to the entire consume->process->publish only.

The consume->process part is at least once.

You need to make the process part idempotent.

A common technique is to store the topic/partition/offset along with other data in the process so you can check whether this record has already been processed.

You can enable the delivery attempt header, but that will only tell you that this is a redelivery, it can't tell you whether the failure was at the start of the process or at the end (during the send).