I am using BullMQ with express server to process jobs asynchronously but confused as how to retrieve the results from completed jobs.
What I am doing currently is to listen for job completed status event and store those results in an object with job Id as key and retrieving the results from that object whenever I need it. Is there a recommended way of doing this?
I looked at BullMQ documentation but couldn't find anything about how to retrieve results.
Here is the sample code:
server.js
// Kick off a new job by adding it to the work queue
app.post("/api/submitjob", async (req, res) => {
let job = await workQueue.add();
res.json({ id: job.id });
});
app.get("/api/jobstatus/:id", async (req, res) => {
let id = req.params.id;
let job = await workQueue.getJob(id);
if (job === null) {
res.status(404).end();
} else {
let state = await job.getState();
let reason = job.failedReason;
res.json({ id, state, progress, reason, result: jobIdResultMap[id] });
}
});
// You can listen to global events to get notified when jobs are processed
workQueue.on('global:completed', (jobId, result) => {
logger.log('info', `${jobId} succesfully completed`);
jobIdResultMap[jobId] = JSON.parse(result);
});
app.listen(PORT, () => console.log(`✅ API Server started: http://${HOST}:${PORT}/api/v1/endpoint`));
worker.js:
let throng = require("throng");
let Queue = require("bull");
// Connect to a local redis instance locally, and the Heroku-provided URL in production
let REDIS_URL = process.env.REDIS_URL || "redis://127.0.0.1:6379";
// Spin up multiple processes to handle jobs to take advantage of more CPU cores
// See: https://devcenter.heroku.com/articles/node-concurrency for more info
let workers = process.env.WEB_CONCURRENCY || 2;
// The maximum number of jobs each worker should process at once. This will need
// to be tuned for your application. If each job is mostly waiting on network
// responses it can be much higher. If each job is CPU-intensive, it might need
// to be much lower.
let maxJobsPerWorker = 50;
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function start() {
// Connect to the named work queue
let workQueue = new Queue("work", REDIS_URL);
workQueue.process(maxJobsPerWorker, async (job) => {
// This is an example job that just slowly reports on progress
// while doing no work. Replace this with your own job logic.
let progress = 0;
await sleep(50);
// A job can return values that will be stored in Redis as JSON
// This return value is unused in this demo application.
return { value: "This will be stored" };
});
}
// Initialize the clustered worker process
// See: https://devcenter.heroku.com/articles/node-concurrency for more info
throng({ workers, start });
The recommended way of doing this is to use a job queue and a message queue.
At the time of writing BullMQ's documentation is incomplete, therefore you should look at the docs from Bull.
From the documentation -
Returning Job Completions
A common pattern is where you have a cluster of queue processors that just process jobs as fast as they can, and some other services that need to take the result of this processors and do something with it, maybe storing results in a database.
The most robust and scalable way to accomplish this is by combining the standard job queue with the message queue pattern: a service sends jobs to the cluster just by opening a job queue and adding jobs to it, and the cluster will start processing as fast as it can. Every time a job gets completed in the cluster a message is sent to a results message queue with the result data, and this queue is listened by some other service that stores the results in a database.