Kafka consumer is getting automatically resumed after a manual pause is invoked

72 views Asked by At

I am trying to pause a reactive kafka consumer using the below code:

reactiveKafkaReceiver.doOnConsumer(consumer -> {
                            log.warn("Inside doOnConsumer");
                            log.warn("Current partitions: {}", consumer.assignment());
                            consumer.pause(consumer.assignment());
                            log.warn("Paused partitions");
                            return consumer;
                        }).doOnSuccess(success -> {
                            log.info("Successful pause ");
                                }).doOnError(e -> log.error("Error in pausing", e))
                                .subscribe();

I am getting the success messages upon pausing. However, the receiver is again getting resumed with the below logs:

"Paused partitions"

"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Group coordinator kafka:443 (id: 2147483647 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response. isDisconnected: false. Rediscovery will be attempted."

"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Requesting disconnect from last known coordinator kafka:443 (id: 2147483647 rack: null)"

"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Client requested disconnect from node 2147483647"

"Successful pause "

"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Discovered group coordinator kafka:443 (id: 2147483647 rack: null)"

"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Attempt to heartbeat with Generation{generationId=3, memberId='test.v2-b3209c78-8358-442d-88e7-340af32360e6', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation"

"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Resetting generation and member id due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response"

"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Request joining group due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response"

"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Giving away all assigned partitions as lost since generation/memberID has been reset,indicating that consumer is in old state or no longer part of the group"

"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Lost previously assigned partitions test.partition.v3-0, test.partition.v3-1, test.partition.v3-2, test.partition.v3-3"

"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] The pause flag in partitions [test.partition.v3-3, test.partition.v3-2, test.partition.v3-1, test.partition.v3-0] will be removed due to partition lost."

After this, there is a rebalance and the kafka consumer is resumed.

0

There are 0 answers