Provided are the details including code that I am using to implement jobs with Nodejs
Some things to keep in mind, as GCP runs multiple instance, they all have access to this one worker and the same Redis instance.
As a newbie to jobs in general, I didn't see this issue before but now as the users have increased on the platform it happens to atleast 10% of them but the rest of the jobs are successful.
I keep facing this issue where these are the following errors that BullMq keeps showing:
- Missing key for job "abc"
- Missing lock for job "abc"
- Error: Job "abc" is not in the active state. failed
- Error: Lock mismatch for job "abc." Cmd failed from active
The tech stack:
- GCP App engine standard
- GCP Redis Memorystore
- Nodejs
- Express
As for my code I have divided the bullmq code into 4 parts:
- The first part is the setup of the
QUEUE
:
import { JobsOptions, Queue } from 'bullmq';
import { REDIS_QUEUE_HOST, REDIS_QUEUE_PORT } from '../../../../middleware/env';
import { DEFAULT_REMOVE_CONFIG, QUEUE_NAME_REPLY } from '../../../../constants/queueSystem';
import { setUpReplyWorker } from './reply-worker';
// reply queue
export const replyQueue = new Queue(QUEUE_NAME_REPLY, {
connection: {
host: REDIS_QUEUE_HOST,
port: parseInt(REDIS_QUEUE_PORT || '6379'),
connectTimeout: 1000 * 60 * 5,
},
});
setUpReplyWorker();
export const addReplyJob = async <T>(jobName: string, data: T, config?: JobsOptions) => {
return replyQueue.add(jobName, data, {
removeOnComplete: true,
removeOnFail: true,
...config,
});
};
- The second part is the
WORKER
:
import { Job, Worker } from 'bullmq';
import { REDIS_QUEUE_HOST, REDIS_QUEUE_PORT } from '../../../../middleware/env';
import { QUEUE_NAME_REPLY } from '../../../../constants/queueSystem';
import { replyJobProcessor } from './reply-jobProcessor';
export const setUpReplyWorker = (): void => {
const worker = new Worker(QUEUE_NAME_REPLY, replyJobProcessor, {
connection: {
host: REDIS_QUEUE_HOST,
port: parseInt(REDIS_QUEUE_PORT || '6379'),
},
autorun: true,
concurrency: 20000,
// 5 mins
lockDuration: 1000 * 60 * 5,
});
worker.on('completed', (job: Job, returnvalue: 'DONE') => {
console.debug(`Completed job "reply" with id ${job.id}`, returnvalue);
});
worker.on('active', (job: Job<unknown>) => {
console.debug(`Active job "reply" with id ${job.id}`);
});
worker.on('error', (failedReason: Error) => {
console.error(`Job encountered an error "reply"`, failedReason);
});
};
- The third is the
Job processor
that I passed as anarg
in theworker
import { Job } from 'bullmq';
import { someFunction } from '../../automate/someFunction';
export const replyJobProcessor = async (
job: Job<{
someId: string;
}>,
): Promise<'DONE'> => {
await job.log(`Started processing job with id ${job.id}`);
await someFunction({
someId: "id"
});
await job.log(`Finished processing job with id ${job.id}`);
return 'DONE';
};
- The
FUNCTION
itself:
export const someFunction = async ({ someId }: { someId: string }) => {
let incorrectData;
// Call API to get the data
if (incorrectData) {
someFunction({ someId });
return;
}
await continueToAnotherFunction({ someId });
return;
};
I was hoping to try and replicate it and even after going to BullMq's documentation, I don't understand why my would process twice. Because these issues are are taking place whenever a job is processed twice. But I have made sure in my code that the code isn't running twice. I even saw that when a job is locked and is active then no matter the job id
being the same, it wouldn't process the job.
Also I DO NOT manually move jobs to any state. I haven't touched that aspect at all.
What else can I do to resolve this? Please help.
I'm facing exactly the same issue. In my opinion, the source of the problem is that the processor gets blocked for some reason , so the library unlocks the message and it's processed again. Maybe the processor takes too long to complete the action. I'm completely lost.