How in RabbitMQ and PHP return task back to the queue?

10.3k views Asked by At

How can I return message back to the queue if processing result did not suit me. Found only information about message acknowledgments but I think that it does not suit me. I need that if as a result of processing I get the parameter RETRY message is added back to the queue. And then this worker or another one picks it up again and tries to process it.

For example:

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();

$channel->queue_declare('test', false, false, false, false);

$callback = function($msg) {
    $condition = json_decode($msg->body);

    if (!$condition) {
        # return to the queue
    }
};

$channel->basic_consume('test', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();
?>
3

There are 3 answers

1
Honarkhah On BEST ANSWER

set auto no_ack flag to false

queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: Tells the server if the consumer will acknowledge the messages.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback

$channel->basic_consume('test', '', false, false, false, false, $callback);

you must use acknowledgments , if your proccess not work you can ignore ack

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();

$channel->queue_declare('test', false, false, false, false);

$callback = function($message) {
    $condition = json_decode($message->body);
     
    if (!$condition) {
        // return to the queue 
        $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
    }else{
        // send ack , remove from queue
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    }
};

$channel->basic_consume('test', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

Of course with this approach you will face with the message always in the head of the queue, there is also another possibility, if you really want to have a track of retry you can follow the below approach

defining a queue for retry, preferably your queue-name -retry and define a dead-letter queue preferably: -dlq

Then you can do something like below: How to set up -retry queue: this is the most important part of it. you need to declare queue with the following features:

x-dead-letter-exchange: should be same as your main queue routing key
x-dead-letter-routing-key: should be same as your main queue routing key
x-message-ttl: the delay between retries

the codes are sudo code, please do not copy-paste, this is just a hint to give you the idea about it

$maximumRetry = 5;
$callback = function($message) {
    $body = json_decode($message->body);
    try { 
        // process result is your condition
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    } catch(Exception $e) {
        // return to the queue 
        $body['try_attempt'] = !empty($body['try_attempt'])? int($body['try_attempt']) + 1: 1 
        if ($body['try_attempt'] >= $maximumRetry ){
            $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
            return
        }
        $msg = new AMQPMessage(json_encode($message));

        $channel->basic_publish($msg, '', 'test-retry');
    }
};

We gonna need 3 queues for retying.

  • queue.example

    • bindings:
      • exchange: queue.exchange
      • routing: queue.example
    • features:
      • x-dead-letter-exchange: queue.exchange
      • x-dead-letter-routing-key: queue.example-dlq
  • queue.example-dlq

    • bindings:
      • exchange: queue.exchange
      • routing: queue.example-dlq
  • queue.example-retry

    • bindings:
      • exchange: queue.exchange
      • routing: queue.example-retry
    • features:
      • x-dead-letter-exchange: queue.exchange
      • x-dead-letter-routing-key: queue.example-added
      • x-message-ttl: 10000

------------- Update -------------

Quorum queues are providing ability out of the box so in the consumer, you can understand how many times each message was retried and you can also define a dead-letter queue for it easily, for more information you can read more about quorom queues and poison message handling

1
John Doe On

The solution turned out to be easier than I thought, it turns out the task was not specifically about RabbitMQ, but about the scope of variables. If anyone is interested in a solution, here:

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();

$channel->queue_declare('test', false, false, false, false);

$callback = function($msg) {
  global $channel;

  $condition = json_decode($msg->body);

  if (!$condition) {
    $msg = new AMQPMessage(json_encode(array(
      'condition' => false
    )));

    $channel->basic_publish($msg, '', 'test');
  }
};

$channel->basic_consume('test', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
  $channel->wait();
}

$channel->close();
$connection->close();
?>
0
jean davy Nizigama On

This may be a bit late, but this is how you should do it with this version of php-amqplib "php-amqplib/php-amqplib": "^3.1" You need to set the no_ack parameter of basic consume method to false(which is the default) then explicitly specify it in the callback using the method nack on the AMQPMessage object passed to the callback

<?php
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    
    echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
    
    $connection = new AMQPStreamConnection($AMQP);
    $channel = $connection->channel();
    
    $channel->queue_declare('test', false, false, false, false);
    
    $callback = function($msg) {
        $condition = json_decode($msg->body);
    
        if (!$condition) {
            // message will be added back to the queue
            $msg->nack(true);
        }
    };
    
    $channel->basic_consume('test', '', false, false, false, false, $callback);
    
    while(count($channel->callbacks)) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();
    ?>