I am trying to consume submissions from "fresh_queue" 3 times fast than i consume from "rotten_queue" using "amqplib" in nodejs.
so i am trying to use counter variables to keep track of the completed tasks from both queues.
and this is the code i have written:
async consumeMessages() {
// establishing a connection to the RabbitMQ server
const connection = await connect(process.env.AMQP_URL);
console.log("Connected to RabbitMQ with url: " + process.env.AMQP_URL);
//creating a channel
const freshChannel = await connection.createChannel();
// setting the maximum number of tasks that can be done at a time
freshChannel.prefetch(1);
// counter to keep track of the number of tasks done from the fresh submissions queue
let compiledFreshSubmissions = 0;
let compiledRottenSubmissions = 0;
// checking for the existence of the fresh submissions queue
await freshChannel.assertQueue("fresh_submissions", {
durable: false,
});
// consuming the fresh submissions queue
freshChannel.consume("fresh_submissions", async (msg) => {
if (!msg) {
throw new Error("No message to consume");
}
console.log("trying to consume rotten");
if (compiledFreshSubmissions / 3 >= compiledRottenSubmissions) {
console.log(
"[-] Consuming a message from the queue: fresh_submissions And number of done tasks: " +
compiledFreshSubmissions
);
compiledFreshSubmissions++;
await this.processMessages(msg, freshChannel);
} else {
console.log("Skipping fresh message to maintain 3:1 ratio");
}
});
const rottenChannel = await connection.createChannel();
rottenChannel.prefetch(1);
// checking for the existence of the rotten submissions queue
await rottenChannel.assertQueue("rotten_submissions", {
durable: false,
});
await rottenChannel.consume("rotten_submissions", async (msg) => {
if (!msg) {
throw new Error("No message to consume");
}
console.log(
"[-] Consuming a message from the queue: rotten_submissions And number of done tasks: " +
compiledRottenSubmissions
);
compiledRottenSubmissions++;
await this.processMessages(msg, rottenChannel);
});
}
but i get an unexpected behavior it starts consuming the "rotten_queue" and never consumes the "fresh_queue".
i don't fully understand how RabbitMQ works but think it might be a concurrency problem.
any suggestions?