I am trying to pause a reactive kafka consumer using the below code:
reactiveKafkaReceiver.doOnConsumer(consumer -> {
log.warn("Inside doOnConsumer");
log.warn("Current partitions: {}", consumer.assignment());
consumer.pause(consumer.assignment());
log.warn("Paused partitions");
return consumer;
}).doOnSuccess(success -> {
log.info("Successful pause ");
}).doOnError(e -> log.error("Error in pausing", e))
.subscribe();
I am getting the success messages upon pausing. However, the receiver is again getting resumed with the below logs:
"Paused partitions"
"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Group coordinator kafka:443 (id: 2147483647 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response. isDisconnected: false. Rediscovery will be attempted."
"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Requesting disconnect from last known coordinator kafka:443 (id: 2147483647 rack: null)"
"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Client requested disconnect from node 2147483647"
"Successful pause "
"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Discovered group coordinator kafka:443 (id: 2147483647 rack: null)"
"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Attempt to heartbeat with Generation{generationId=3, memberId='test.v2-b3209c78-8358-442d-88e7-340af32360e6', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation"
"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Resetting generation and member id due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response"
"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Request joining group due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response"
"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Giving away all assigned partitions as lost since generation/memberID has been reset,indicating that consumer is in old state or no longer part of the group"
"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] Lost previously assigned partitions test.partition.v3-0, test.partition.v3-1, test.partition.v3-2, test.partition.v3-3"
"[Consumer clientId=test.v2, groupId=test.consumerGroup.v1] The pause flag in partitions [test.partition.v3-3, test.partition.v3-2, test.partition.v3-1, test.partition.v3-0] will be removed due to partition lost."
After this, there is a rebalance and the kafka consumer is resumed.