Provided are the details including code that I am using to implement jobs with Nodejs

Some things to keep in mind, as GCP runs multiple instance, they all have access to this one worker and the same Redis instance.

As a newbie to jobs in general, I didn't see this issue before but now as the users have increased on the platform it happens to atleast 10% of them but the rest of the jobs are successful.

I keep facing this issue where these are the following errors that BullMq keeps showing:

  1. Missing key for job "abc"
  2. Missing lock for job "abc"
  3. Error: Job "abc" is not in the active state. failed
  4. Error: Lock mismatch for job "abc." Cmd failed from active

The tech stack:

  • GCP App engine standard
  • GCP Redis Memorystore
  • Nodejs
  • Express

As for my code I have divided the bullmq code into 4 parts:

  1. The first part is the setup of the QUEUE:
import { JobsOptions, Queue } from 'bullmq';
import { REDIS_QUEUE_HOST, REDIS_QUEUE_PORT } from '../../../../middleware/env';
import { DEFAULT_REMOVE_CONFIG, QUEUE_NAME_REPLY } from '../../../../constants/queueSystem';
import { setUpReplyWorker } from './reply-worker';

// reply queue
export const replyQueue = new Queue(QUEUE_NAME_REPLY, {
  connection: {
    host: REDIS_QUEUE_HOST,
    port: parseInt(REDIS_QUEUE_PORT || '6379'),
    connectTimeout: 1000 * 60 * 5,
  },
});

setUpReplyWorker();

export const addReplyJob = async <T>(jobName: string, data: T, config?: JobsOptions) => {
  return replyQueue.add(jobName, data, {
    removeOnComplete: true,
    removeOnFail: true,
    ...config,
  });
};
  1. The second part is the WORKER:
import { Job, Worker } from 'bullmq';
import { REDIS_QUEUE_HOST, REDIS_QUEUE_PORT } from '../../../../middleware/env';
import { QUEUE_NAME_REPLY } from '../../../../constants/queueSystem';
import { replyJobProcessor } from './reply-jobProcessor';

export const setUpReplyWorker = (): void => {
  const worker = new Worker(QUEUE_NAME_REPLY, replyJobProcessor, {
    connection: {
      host: REDIS_QUEUE_HOST,
      port: parseInt(REDIS_QUEUE_PORT || '6379'),
    },
    autorun: true,
    concurrency: 20000,
    // 5 mins
    lockDuration: 1000 * 60 * 5,
  });

  worker.on('completed', (job: Job, returnvalue: 'DONE') => {
    console.debug(`Completed job "reply" with id ${job.id}`, returnvalue);
  });

  worker.on('active', (job: Job<unknown>) => {
    console.debug(`Active job "reply" with id ${job.id}`);
  });
  worker.on('error', (failedReason: Error) => {
    console.error(`Job encountered an error "reply"`, failedReason);
  });
};
  1. The third is the Job processor that I passed as an arg in the worker
import { Job } from 'bullmq';
import { someFunction } from '../../automate/someFunction';

export const replyJobProcessor = async (
  job: Job<{
    someId: string;
  }>,
): Promise<'DONE'> => {
  await job.log(`Started processing job with id ${job.id}`);

  await someFunction({
    someId: "id"
  });

  await job.log(`Finished processing job with id ${job.id}`);
  return 'DONE';
};
  1. The FUNCTION itself:
export const someFunction = async ({ someId }: { someId: string }) => {
  let incorrectData;
  
  // Call API to get the data

  if (incorrectData) {
    someFunction({ someId });
    return;
  }

  await continueToAnotherFunction({ someId });

  return;
};

I was hoping to try and replicate it and even after going to BullMq's documentation, I don't understand why my would process twice. Because these issues are are taking place whenever a job is processed twice. But I have made sure in my code that the code isn't running twice. I even saw that when a job is locked and is active then no matter the job id being the same, it wouldn't process the job.

Also I DO NOT manually move jobs to any state. I haven't touched that aspect at all.

What else can I do to resolve this? Please help.

1

There are 1 answers

0
Pedro Blanco Pascual On

I'm facing exactly the same issue. In my opinion, the source of the problem is that the processor gets blocked for some reason , so the library unlocks the message and it's processed again. Maybe the processor takes too long to complete the action. I'm completely lost.