Kafka Polling and rebalancing issue

762 views Asked by At

I am pushing events to topic 1 and if that gets failed, then adding that event to DLQ topic2. I am using separate consumers for both of the topics and the events which are pushing into DLQ, adding an identifier(timestamp) in that event.
Now, what i want is to delay the processing of the record( for testing that delay is 50 sec) for which timestamp is used. since, as the event gets pushed in DLQ it gets processed once and after that delay when it should process again, kafka listener or consumer is not polling the records and after 10 mins rebalancing is happening. couple of things here, i am confused is.

  1. do i have to explicitly call poll() function to fetch the records or consumer automatically fetches the records from broker
  2. suppose delay is 30 min, now suppose consumer will be fetching the records after every 5 mins but as i am not processing due to that delay i have incorporated, the offset will get stuck at that point only, what broker will assume at that time, consumer is working or dead
  3. also, heartbeat works with poll function only or is working independently? i.e it works only when poll is called?
  4. suppose if i not poll(), will consumer be able to fetch the records? is there any default mechanism?
  5. and how can i avoid rebalancing by maintaining this delay? like can i do this by configuring -> max poll interval ms, request timeout ms, session timeout ms, heartbeat ms, poll timeout, max pull records
@KafkaListener(
    id = "${kafka.topic1.id}",
    concurrency = "${kafka.topic.concurrency}",
    groupId = "${spring.kafka.consumer.group-id}",
    topics = {"${kafka.topic}"},
    containerFactory = "kafkaListenerContainerFactory")
private void consumer(
    ConsumerRecord<?, ?> consumerRecord, Acknowledgment acknowledgment) {

  log.info(
      "Received message from topic at offset {}, {}",
      consumerRecord.topic(),
      consumerRecord.offset());

  String eventType = getEventType(consumerRecord);
  String serializedKafkaData = consumerRecord.value().toString();

  eventHandler
      .handleEvent(eventType, serializedKafkaData)
      .doOnSuccess(
          aBoolean -> {
            acknowledgment.acknowledge();
          })
      .onErrorResume(
          err -> {
            acknowledgment.acknowledge();
            log.error(
                "Exception Occurred in executing event {}",
                consumerRecord.value().toString(),
                err);
            // handle with particular timeout case in connector
            if (err instanceof Exception) {
              List<Header> headers = new ArrayList<>();
              headers.add(
                  new RecordHeader(
                      CommonConstants.EVENT_TYPE_HEADER_IN_CONSUMER,
                      Events.valueOf(eventType).name().getBytes()));
              headers.add(
                  new RecordHeader(
                      CommonConstants.TIMESTAMP, LocalDateTime.now().toString().getBytes()));
              eventHandler.handleEventForDLQ(eventType, consumerRecord, headers);
            }
            return Mono.error(err);
          })
      .subscribe();
}

@KafkaListener(
    id = "${kafka.topic2.dlq.id}",
    concurrency = "${kafka.topic.concurrency}",
    groupId = "${spring.kafka.consumer.group-id}",
    topics = {"${kafka.post.payment.events.topic.dlq}"},
    containerFactory = "kafkaListenerContainerFactory")
private void consumerDlq(
    ConsumerRecord<?, ?> consumerRecord, Acknowledgment acknowledgment) {

  log.info(
      "Received message from dlq topic at offset {}, {}",
      consumerRecord.topic(),
      consumerRecord.offset());

  String eventType = getEventType(consumerRecord);
  String serializedKafkaData = consumerRecord.value().toString();
  LocalDateTime timestamp = getRecordTimestamp(consumerRecord);
  
  if (LocalDateTime.now().minusSeconds(50).isAfter(timestamp)) {
    log.info("timestamp criteria {}", (LocalDateTime.now() - 50 > timestamp));
    eventHandler
        .handleEvent(eventType, serializedKafkaData)
        .doOnSuccess(
            aBoolean -> {
              acknowledgment.acknowledge();
            })
        .onErrorResume(
            err -> {
              acknowledgment.acknowledge();
              log.error(
                  "Exception Occurred in executing DLQs event {}",
                  consumerRecord.value().toString(),
                  err);
              // handle with particular timeout case in connector
              if (err instanceof Exception) {
                List<Header> headers = new ArrayList<>();
                headers.add(
                    new RecordHeader(
                        CommonConstants.EVENT_TYPE_HEADER_IN_CONSUMER,
                        Events.valueOf(eventType).name().getBytes()));
                headers.add(
                    new RecordHeader(
                        CommonConstants.TIMESTAMP, LocalDateTime.now().toString().getBytes()));
                eventHandler.handleEventForDLQ(eventType, consumerRecord, headers);
              }
              return Mono.error(err);
            })
        .subscribe();
  }
0

There are 0 answers