Pika ConnectionResetError(104, 'Connection reset by peer') having 2 connections using threads

239 views Asked by At

I am running into the following error with pika

pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')

I've read on stackoverflow and other sources that the origin of this error are missed heartbeats and that threading should be the solution. I've also seen from this talk that it is recommended to have 2 connections. One for consuming and one for producing. From the pika repository I found this solution for creating threads for doing the work load and not miss heartbeats. From the pika FAQ I also know that pika is not threadsafe and thus every connection needs to run in it's own thread.

Based on that I designed my service to have a Consumer and a Producer object which run in their own thread and the consumer thread creates a new thread for "doing the work".

producer = Producer()
consumer = Consumer(producer, mgr)

producer.start()
consumer.start()
class Producer(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.message_queue = Queue()
    
    def run(self):
        self.createConnection()
        # declare your exchanges and bindings here
        self.channel.exchange_declare(exchange=self.exchange, exchange_type='topic')
        
        # sleep 5 seconds
        time.sleep(5)
        
        self.add_message(self.exchange, 'hello.example', {'count': 0})

        while True:
            if not self.message_queue.empty():
                message = self.message_queue.get()
                self.publish(message['exchange'], message['routing_key'], message['body'])

class Consumer(threading.Thread):
    def __init__(self, producer, mgr):
        threading.Thread.__init__(self)

    def run(self):
        self.createConnection()

        # declare your exchanges and bindings here
        self.channel.exchange_declare(exchange=self.exchange, exchange_type='topic')
        
        # example queue declaration
        result = self.channel.queue_declare(queue='', exclusive=True)
        queue_name = result.method.queue
        self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key='*.example')
        
        self.channel.basic_qos(prefetch_count=1)

        threads = []
        on_message_callback = functools.partial(self.on_message, args=(self.connection, threads))
        self.channel.basic_consume(queue=queue_name, on_message_callback=on_message_callback, auto_ack=False)

        logging.info('Start listening to messages... on exchange: {}'.format(self.exchange))
        self.channel.start_consuming()

        # Wait for all to complete
        for thread in threads:
            thread.join()


Here is the full example of my service implementation. It is not my exact service implementation but the general structure and idea are the same. Locally and in my docker setup the service, so far, runs fine most of the times. But deployed on my test servers it often runs into the connection reset error.

Are there any flaws with my service implementation? Or could the issue be with my RabbitMQ configuration? (I am using the RabbitMQ Bitnami 3.11.8-0 image, which runs on an AWS EC2 instance. I have not changed any of the default configuration.)

I am a bit stuck and not quite sure what to try next. So I am grateful for any pointers or suggestions.

Thanks!

0

There are 0 answers