What's the best way to retry publishing messages with kombu?

2.9k views Asked by At

I'm testing how kombu works. I'm planning to replace pika in several projects. I see that kombu has a lot of documentation but using what I found in the documentation some messages are lost. Here it's the code:

from kombu import Connection, Producer
conn = Connection('amqp://localhost:5672')
def errback(exc, interval):
     logger.error('Error: %r', exc, exc_info=1)
     logger.info('Retry in %s seconds.', interval)
producer = Producer(conn)
publish = conn.ensure(producer, producer.publish, errback=errback, max_retries=3)
for i in range(1, 200000):
   publish({'hello': 'world'}, routing_key='test_queue')
   time.sleep(0.001)

When it's publishing I close the connection several times and it keeps publishing but in the queue there are around 60000 messages, so there are a lot of lost messages.

I've tried different alternatives e.g:

publish({'hello': 'world'}, retry=True, mandatory=True, routing_key='hipri')

Thanks!

1

There are 1 answers

2
Félix On

The problem was that by default Kombu doesn't use 'confirm', you have to use:

        conn = Connection('amqp://localhost:5672', transport_options={'confirm_publish': True})

Thanks