Stream connection lost: RxEndOfFile(-1, 'End of input stream (EOF)' when connecting from Lambda to Amazon MQ using Python + AMQP

98 views Asked by At

My goal is to create a client responsible for sending messages from AWS Lambda to Amazon MQ (ActiveMQ Classic) through AMQP protocol. I've done everything according to the documentation.

class BasicPikaClient:
    def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region):
        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
        ssl_context.set_ciphers("ECDHE+AESGCM:!ECDSA")
        ssl_context.load_verify_locations(cafile="ca.pem")

        url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671"
        parameters = pika.URLParameters(url)
        parameters.ssl_options = pika.SSLOptions(context=ssl_context)

        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()


class BasicMessageSender(BasicPikaClient):
    def declare_queue(self, queue_name):
        print(f"Trying to declare queue({queue_name})...")
        self.channel.queue_declare(queue=queue_name)

    def send_message(self, exchange, routing_key, body):
        channel = self.connection.channel()
        channel.basic_publish(exchange=exchange, routing_key=routing_key, body=body)
        print(
            f"Sent message. Exchange: {exchange}, Routing Key: {routing_key}, Body: {body}"
        )

    def close(self):
        self.channel.close()
        self.connection.close()

The documentation seems to be outdated so I had to change ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) with ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT). The ca.pem file comes from the Amazon Trust Services Repository. I've chosen this guy (don't ask me why, I'm just guessing):

enter image description here

When running the script I got such an Exception pika.exceptions.IncompatibleProtocolError: StreamLostError: ("Stream connection lost: RxEndOfFile(-1, 'End of input stream (EOF)')",)

basic_message_sender = BasicMessageSender(
        "b-177801af-xxx-1",
        "xxx",
        "xxx",
        "eu-central-1",
    )

    basic_message_sender.declare_queue("xxx")

    basic_message_sender.send_message(
        exchange="", routing_key="xxx", body=b"Hello World!"
    )

    basic_message_sender.close()
1

There are 1 answers

0
Justin Bertram On

The documentation you're following is for Amazon MQ with the RabbitMQ engine. I don't believe it will work with Amazon MQ with the ActiveMQ Classic engine because RabbitMQ supports AMQP 0.9.1 and ActiveMQ Classic doesn't. ActiveMQ Classic only supports AMQP 1.0.