Receiving duplicate msg in the consumer

77 views Asked by At

Issue: Our consumer service is receiving duplicate msg even after placing a commit on the offset.

Tried: Initially we had the interval level commit and then moved to the offset level commit but still this didn't solve the issue. I am also simulating CICD(restarting services every 2 minutes) because we mostly encounter duplication when there is a pod restart or there is a deployment happening.

Current Setup:

PFB the methods we have:

initialize consumer

const groupId = `group-${topicName.toLowerCase()}-${process.env.NODE_ENV}`;

we have a bunch of services having the same groupID

subscribeConsumer

await this.consumer.subscribe({
     topic,
     fromBeginning: true,
});

getMessage

this.consumer.run({
    eachBatchAutoResolve: false,
    // autoCommitInterval: 1000,
    // autoCommitThreshold: 5,
    eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
                if (!isRunning() || isStale()) return;
                await heartbeat();
                await processTopicMsg(logger, batch.messages, country, resolveOffset, heartbeat);
                await heartbeat();
            },
        });

processTopicMsg:

perform the processing of the msg

resolveOffset(topicMsg[Index].offset);
await heartbeat();
0

There are 0 answers