Consume 2 RabbitMQ queues using 1 consumer in NodeJS

23 views Asked by At

I am trying to consume submissions from "fresh_queue" 3 times fast than i consume from "rotten_queue" using "amqplib" in nodejs.

so i am trying to use counter variables to keep track of the completed tasks from both queues.

and this is the code i have written:

async consumeMessages() {
    // establishing a connection to the RabbitMQ server
    const connection = await connect(process.env.AMQP_URL);

    console.log("Connected to RabbitMQ with url: " + process.env.AMQP_URL);

    //creating a channel
    const freshChannel = await connection.createChannel();

    // setting the maximum number of tasks that can be done at a time
    freshChannel.prefetch(1);

    // counter to keep track of the number of tasks done from the fresh submissions queue
    let compiledFreshSubmissions = 0;
    let compiledRottenSubmissions = 0;

    // checking for the existence of the fresh submissions queue
    await freshChannel.assertQueue("fresh_submissions", {
      durable: false,
    });

    // consuming the fresh submissions queue
    freshChannel.consume("fresh_submissions", async (msg) => {
      if (!msg) {
        throw new Error("No message to consume");
      }

      console.log("trying to consume rotten");
      if (compiledFreshSubmissions / 3 >= compiledRottenSubmissions) {
        console.log(
          "[-] Consuming a message from the queue: fresh_submissions And number of done tasks: " +
            compiledFreshSubmissions
        );
        compiledFreshSubmissions++;
        await this.processMessages(msg, freshChannel);
      } else {
        console.log("Skipping fresh message to maintain 3:1 ratio");
      }
    });

    const rottenChannel = await connection.createChannel();
    rottenChannel.prefetch(1);

    // checking for the existence of the rotten submissions queue
    await rottenChannel.assertQueue("rotten_submissions", {
      durable: false,
    });

    await rottenChannel.consume("rotten_submissions", async (msg) => {
      if (!msg) {
        throw new Error("No message to consume");
      }

      console.log(
        "[-] Consuming a message from the queue: rotten_submissions And number of done tasks: " +
          compiledRottenSubmissions
      );

      compiledRottenSubmissions++;
      await this.processMessages(msg, rottenChannel);
    });
  }

but i get an unexpected behavior it starts consuming the "rotten_queue" and never consumes the "fresh_queue".

i don't fully understand how RabbitMQ works but think it might be a concurrency problem.

any suggestions?

0

There are 0 answers