It is possible to publish messages into a RabbitMQ queue with an expiration TTL: such messages will expire once the TTL is done and (if a dead-letter queue is setup,) removed to the dead-letter queue.
But is it possible to specify such per-message TTL using Celery?
Note that I'm not looking for a way to specify task-expiration but rather message expiration: I want my messages to spend (a configurable) amount of time in the queue before finally getting picked up @ the dead-letter queue.
TIA.
Short introduction: Expiration vs Expires
RabbitMQ does support per-message TTL (as well as TTL for the queue), the behavior is documented here: https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers. The trick is to set the
expiration
Message Property (https://www.rabbitmq.com/publishers.html#message-properties) when the message is published (in milliseconds).Celery on the other hand allows you to set the
expires
parameter (https://docs.celeryproject.org/en/stable/reference/celery.app.task.html) in seconds or as a datetime. The difference from the native RabbitMQ functionality is that the message remains in the queue after expiration. The expired message is delivered to the worker, which then reads the expires header to determine that the message has expired and rejects the message.tl;dr:
expiration != expires
How to pass a message property in Celery
This method is not documented in Celery. I figured it out by trial and error because I wanted a native TTL myself.
The
send_task
method (celery.app.base.Celery.send_task
), which is called for example byapply_async
, accepts the**options
parameter. All**options
unknown to Celery are then passed in thecelery.app.amqp.Queues->send_task_message( ... )
method as**kwargs
and then as message properties.So if we can set the message property, there is nothing easier than setting the native expiration: