How to stop bull queue from automatically restarting jobs when restarting the server

5.2k views Asked by At

I am using the bull queue to process jobs.

Let's say a job is running with a status of active when I restart my dev server. When the worker script starts up again, the process is still set to active in the queue, so bull decides to start up the worker process again.

This become disruptive very quickly, as the script is often restarting during development, so many processes end up running and making a mess of things. All I want is bull to NOT restart these jobs when the server starts.

Things I've tried:

    let active_jobs = await queue.getJobs(['active']);
    active_jobs.forEach(async (active_job) => {
      await active_job.discard()
      await active_job.moveToFailed(new Error("Auto-killed during dev server restart"))
    })

None of this works. Anyone have a solution to achieve this?

2

There are 2 answers

0
Torsten Barthel On BEST ANSWER

It actually easy if you have the needed functions in place. You could implement some code to completely clean the specific queues. That code has to live in a place thats called one time once your server starts up. So a good place would be the constructor of your class (of the producer-server). That way you always start with a clean queue with zero entries for development purposes. So best is to wrap the function call to empty the queues into some statement that prooves for development conditions.

You can use something like the following:

const getKeys = async (q) => {
  const multi = q.multi();
  multi.keys('*');
  const keys = await multi.exec();
  return keys[0][1]
}

const filterQueueKeys = (q, keys) => {
  const prefix = `${q.keyPrefix}:${q.name}`;
  return keys.filter(k => k.includes(prefix));
}

const deleteKeys = async (q, keys) => {
  const multi = q.multi();
  keys.forEach(k => multi.del(k));
  await multi.exec();
}

const emptyQueue = async (q) => {  
  const keys = await getKeys(q);
  const queueKeys = filterQueueKeys(q, keys);
  await deleteKeys(q, queueKeys);
}

if (process.env === 'development') {
    emptyQueue(this.workerQueue).then(() => {
      console.log('QUEUE EMPTY!')
    })
}
1
Jakob On

try

queue.obliterate({force: true});

afterwards the queue is paused, so you have to restart it.