How to manually ack messages in RabbitMQ?

800 views Asked by At

I need to know how to manually ack the messages on the queue direct from the Consumer I created and to set a retry strategy of 5 times, each attemp increasing the time like second try 5min, third try, 10min after second try failed, fourth 15min...

I'm kinda lost in the Rabbit documentation, I learned a bit of the concept but the practical use is still a mistery to me...

I'm using Symfony 6.1 and my old_sound_rabbit_mq.yaml looks like this:

old_sound_rabbit_mq:
    connections:
        default:
            host: '%rabbitmqHost%'
            port: '%rabbitmqPort%'
            user: '%rabbitmqUser%'
            password: '%rabbitmqPassword%'
            vhost: '%rabbitmqVhost%'
    consumers:
        upload_file:
            connection: default
            exchange_options: { name: 'upload_file_exchange', type: direct, durable: true, auto_delete: false }
            queue_options: { name: 'upload_file_queue', durable: true, auto_delete: false, arguments: { 'x-max-priority': [ 'I', 20 ] } }
            callback: App\Consumer\UploadFileConsumer
            qos_options: { prefetch_size: 0, prefetch_count: 1, global: false }

This is my consumer:

<?php

declare(strict_types=1);

namespace App\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class UploadFileConsumer implements ConsumerInterface
{
    public function execute(AMQPMessage $msg): void
    {
        try {
            // do something with $msg, if all is good then ack the msg and remove from queue
        } catch (\Exception $e) {
            // keep message in queue, don't ack it, keep it in queue retry 5 times then stop consumer if no success
        }
    }
}
1

There are 1 answers

6
Sammitch On

AMQPMessage provides both ack() and nack() methods for this purpose.

https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Message/AMQPMessage.php#L98-L128

So likely what you want is:

<?php

declare(strict_types=1);

namespace App\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class UploadFileConsumer implements ConsumerInterface
{
    public function execute(AMQPMessage $msg): void
    {
        try {
            // do something with $msg
            $msg->ack();
        } catch (\Exception $e) {
            // keep message in queue, don't ack it, keep it in queue retry
            $msg->nack(true);
        }
    }
}

Though I'm not familiar with a way to limit the number of times a message is re-queued without modifying the headers/payload and re-queueing it as a new message. Alternatively, you can set a TTL value and messages will eventually time out of the queue. You can also create a dead-letter exchange if you want to inspect nack'ed/expired messages. [just make sure to clean it out, otherwise you'll have new problems]

If I had to kludge in a "re-queue X times" I'd suggest a cache with a built-in TTL like Redis, the key is the message ID, and the value is the number of retries.

Edit:

Spitballing some workflows for "Task A must complete before Task B can begin", in order of decreasing preference:

  • If Tasks A and B can never happen without each other, then consolidate Task A and Task B into a single task, as they are not independent.
  • If Task B cannot happen without A, have B invoke A in synchronous/RPC fashion.
  • Create new async Task C that calls A and B in synchronous/RPC fashion.
  • Task A ends with submitting Task B to the queue. [this still has the smell of "A and B" are actually a single work unit]
  • Track task status externally [eg: in a cache like Redis] and have Task B nack/requeue if Task A is not yet completed.

and always beware of "infinite requeue" if your queue does not have a defined TTL, as messages will continue to build up, and your consumers will be working constantly on tasks that may never be completed successfully.