It is possible to publish messages into a RabbitMQ queue with an expiration TTL: such messages will expire once the TTL is done and (if a dead-letter queue is setup,) removed to the dead-letter queue.
But is it possible to specify such per-message TTL using Celery?
Note that I'm not looking for a way to specify task-expiration but rather message expiration: I want my messages to spend (a configurable) amount of time in the queue before finally getting picked up @ the dead-letter queue.
TIA.
Short introduction: Expiration vs Expires
RabbitMQ does support per-message TTL (as well as TTL for the queue), the behavior is documented here: https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers. The trick is to set the
expirationMessage Property (https://www.rabbitmq.com/publishers.html#message-properties) when the message is published (in milliseconds).Celery on the other hand allows you to set the
expiresparameter (https://docs.celeryproject.org/en/stable/reference/celery.app.task.html) in seconds or as a datetime. The difference from the native RabbitMQ functionality is that the message remains in the queue after expiration. The expired message is delivered to the worker, which then reads the expires header to determine that the message has expired and rejects the message.tl;dr:
expiration != expiresHow to pass a message property in Celery
This method is not documented in Celery. I figured it out by trial and error because I wanted a native TTL myself.
The
send_taskmethod (celery.app.base.Celery.send_task), which is called for example byapply_async, accepts the**optionsparameter. All**optionsunknown to Celery are then passed in thecelery.app.amqp.Queues->send_task_message( ... )method as**kwargsand then as message properties.So if we can set the message property, there is nothing easier than setting the native expiration: