RabbitMQ not respecting job priority

770 views Asked by At

I'm trying to get RabbitMQ working in a multi-queue, message priority scenario. However, while I'm setting the job's priority correctly, the worker processes are not respecting the priority set in the message.

Task runner:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$i = 0;

while ($i <= 50000) {
    $p = rand(1, 255);
    $data = "Message #".$i." Priority: ".$p;
    $msg = new AMQPMessage($data,
                            array(
                                'delivery_mode' => 2, # make message persistent
                                'priority' => $p
                            )
                          );

    $channel->basic_publish($msg, '', 'task_queue');
    $i++;
}

$channel->close();
$connection->close();

Worker:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo " [x] Received ", $msg->body, " Priority: ", $msg->get('priority'),"\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

This fires off 50k jobs and then the worker "handles" them. The problem is that even when I start the worker late, to allow the queue to fill up with jobs of varying priority, it still grabs them FIFO style.

Am I missing a flag or something that will make the worker pull jobs in order of highest priority?

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received Message #932 Priority: 107 
 [x] Done
 [x] Received Message #933 Priority: 5 
 [x] Done
 [x] Received Message #934 Priority: 183 
 [x] Done
 [x] Received Message #935 Priority: 231
 [x] Done
 [x] Received Message #936 Priority: 181 
 [x] Done
1

There are 1 answers

0
old_sound On BEST ANSWER

To use priority queues on RabbitMQ you need to set the x-max-priority property when declaring your queue:

https://www.rabbitmq.com/priority.html