Apache Kafka System Error Handling

2.8k views Asked by At

We are trying to implement Kafka as our message broker solution. We are deploying our Spring Boot microservices in IBM BLuemix, whose internal message broker implementation is Kafka version 0.10. Since my experience is more on the JMS, ActiveMQ end, I was wondering what should be the ideal way to handle system level errors in the java consumers?

Here is how we have implemented it currently

Consumer properties

enable.auto.commit=false
auto.offset.reset=latest

We are using the default properties for

max.partition.fetch.bytes
session.timeout.ms

Kafka Consumer

We are spinning up 3 threads per topic all having the same groupId, i.e one KafkaConsumer instance per thread. We have only one partition as of now. The consumer code looks like this in the constructor of the thread class

kafkaConsumer = new KafkaConsumer<String, String>(properties);

    final List<String> topicList = new ArrayList<String>();
    topicList.add(properties.getTopic());

    kafkaConsumer.subscribe(topicList, new ConsumerRebalanceListener() {

        @Override
        public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
        }

        @Override
        public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
            try {
                logger.info("Partitions assigned, consumer seeking to end.");

                for (final TopicPartition partition : partitions) {
                    final long position = kafkaConsumer.position(partition);
                    logger.info("current Position: " + position);

                    logger.info("Seeking to end...");
                    kafkaConsumer.seekToEnd(Arrays.asList(partition));
                    logger.info("Seek from the current position: " + kafkaConsumer.position(partition));
                    kafkaConsumer.seek(partition, position);
                }
                logger.info("Consumer can now begin consuming messages.");
            } catch (final Exception e) {
                logger.error("Consumer can now begin consuming messages.");
            }

        }
    });  

The actual reading happens in the run method of the thread

try {
            // Poll on the Kafka consumer every second.
            final ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);


            // Iterate through all the messages received and print their
            // content.
            for (final TopicPartition partition : records.partitions()) {

                final List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                logger.info("consumer is alive and is processing   "+ partitionRecords.size() +" records");
                for (final ConsumerRecord<String, String> record : partitionRecords) {
                    logger.info("processing topic  "+ record.topic()+" for key "+record.key()+" on offset "+ record.offset());

                    final Class<? extends Event> resourceClass = eventProcessors.getResourceClass();
                    final Object obj = converter.convertToObject(record.value(), resourceClass);
                    if (obj != null) {
                        logger.info("Event: " + obj + " acquired by  " + Thread.currentThread().getName());
                        final CommsEvent event = resourceClass.cast(converter.convertToObject(record.value(), resourceClass));
                        final MessageResults results = eventProcessors.processEvent(event
                                );
                        if ("Success".equals(results.getStatus())) {
                            // commit the processed message which changes
                            // the offset
                            kafkaConsumer.commitSync();
                            logger.info("Message processed sucessfully");
                        } else {
                            kafkaConsumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
                            logger.error("Error processing message : {} with error : {},resetting offset to {} ", obj,results.getError().getMessage(),record.offset());
                            break;
                        }

                    }
                }

            }
            // TODO add return

        } catch (final Exception e) {
            logger.error("Consumer has failed with exception: " + e, e);
            shutdown();
        }  

You will notice the EventProcessor which is a service class which processes each record, in most cases commits the record in database. If the processor throws an error (System Exception or ValidationException) we do not commit but programatically set the seek to that offset, so that subsequent poll will return from that offset for that group id.

The doubt now is that, is this the right approach? If we get an error and we set the offset then until that is fixed no other message is processed. This might work for system errors like not able to connect to DB, but if the problem is only with that event and not others to process this one record we wont be able to process any other record. We thought of the concept of ErrorTopic where when we get an error the consumer will publish that event to the ErrorTopic and in the meantime it will keep on processing other subsequent events. But it looks like we are trying to bring in the design concepts of JMS (due to my previous experience) into kafka and there may be better way to solve error handling in kafka. Also reprocessing it from error topic may change the sequence of messages which we don't want for some scenarios

Please let me know how anyone has handled this scenario in their projects following the Kafka standards.

-Tatha

1

There are 1 answers

1
Edoardo Comar On

if the problem is only with that event and not others to process this one record we wont be able to process any other record

that's correct and your suggestion to use an error topic seems a possible one.

I also noticed that with your handling of onPartitionsAssigned you essentially do not use the consumer committed offset, as you seem you'll always seek to the end.

If you want to restart from the last succesfully committed offset, you should not perform a seek

Finally, I'd like to point out, though it looks like you know that, having 3 consumers in the same group subscribed to a single partition - means that 2 out of 3 will be idle.

HTH Edo