Issue:
Our consumer service is receiving duplicate msg even after placing a commit on the offset.
Tried:
Initially we had the interval level commit and then moved to the offset level commit but still this didn't solve the issue. I am also simulating CICD(restarting services every 2 minutes) because we mostly encounter duplication when there is a pod restart or there is a deployment happening.
Current Setup:
PFB the methods we have:
initialize consumer
const groupId = `group-${topicName.toLowerCase()}-${process.env.NODE_ENV}`;
we have a bunch of services having the same groupID
subscribeConsumer
await this.consumer.subscribe({
topic,
fromBeginning: true,
});
getMessage
this.consumer.run({
eachBatchAutoResolve: false,
// autoCommitInterval: 1000,
// autoCommitThreshold: 5,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
if (!isRunning() || isStale()) return;
await heartbeat();
await processTopicMsg(logger, batch.messages, country, resolveOffset, heartbeat);
await heartbeat();
},
});
processTopicMsg:
perform the processing of the msg
resolveOffset(topicMsg[Index].offset);
await heartbeat();