Binding exchange to exchange in Symfony messenger component

1.6k views Asked by At

Is it possible to define binding between exchanges in Symfony messenger component? (4.4 version here).

I know that it's possible binding exchange to the queue like here:

transports:
            incoming:
                dsn: "%env(RABBITMQ_SHARED_URL)%"
                options:
                    queues:
                        app.pl_incoming_events:
                            binding_keys:
                                - pl.app.#
                    exchange:
                        name: my_app.incoming
                        type: topic

And then app setups exchange, queue and binding between them. I'd expect the same effect to bind exchange to another exchange basing on the routing key.

I know that I can use rabbitmq-bundle but IMO is redundant - I'd like to stay with one component for managing rabbitMQ.

For instance I'd like to bind other_app exchange to my_app.incoming exchange basing on some route key.

1

There are 1 answers

2
msg On

Messenger is not a RabbitMQ manager, you cannot even declare multiple exchanges in the same transport out of the box. But since it has all the required components and in this case symfony is a bit permissive with the config, you can abuse the system and build it yourself.

Since I don't know your requirements I'll keep it basic, hopefully it'll get you started.

Start by creating AmqpTransportFactory:

// src/Amqp/AmqpTransportFactory.php
namespace App\Amqp;

use Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

class AmqpTransportFactory implements TransportFactoryInterface
{
    public function createTransport(
        string $dsn, array $options, SerializerInterface $serializer
    ): TransportInterface
    {
        unset($options['transport_name']);
        $exchanges['name'] = $options['exchange']['name'];
        $exchanges['bindings'] = $options['exchange']['bindings'] ?? [];

        // Passing unknown options is deprecated in 5.1
        unset($options['exchange']['bindings']);

        $connection = Connection::fromDsn($dsn, $options);

        // Ensure our exchange is created first
        $connection->exchange()->declareExchange();
        $channel = $connection->channel();
    
        // This is normally done in the Connection, but is harder to override
        $this->createExchanges($channel, $exchanges);

        return new AmqpTransport($connection, $serializer);
    }

    public function supports(string $dsn, array $options): bool
    {
        return 0 === strpos($dsn, 'amqp://');
    }

    private function createExchanges(\AMQPChannel $channel, array $configuration): void
    {
        $factory = new AmqpFactory();

        foreach ($configuration['bindings'] as $exchange_name => $arguments) {
            $exchange = $factory->createExchange($channel);
            $exchange->setName($exchange_name);
            $exchange->setType($arguments['type'] ?? \AMQP_EX_TYPE_FANOUT);
            $exchange->setFlags($arguments['flags'] ?? \AMQP_DURABLE);
            $exchange->declareExchange();

            if (!is_array($arguments['binding_keys'])) {
                $arguments['binding_keys'] = [$arguments['binding_keys']];
            }

            foreach ($arguments['binding_keys'] as $key) {
                $exchange->bind($configuration['name'], $key);
            }
        }
    }
}

Register the service:

# config/services.yaml
services:
  messenger.transport.amqp.factory:
    class: App\Amqp\AmqpTransportFactory

Add the new configuration to the exchange:

# config/packages/messenger.yaml
exchange:
  name: my_app.incoming
  type: topic
  bindings:
    other_app:
      type: direct
      binding_keys: ['route']

And it will result on the following bindings:

+-----------------+------------------------+------------------------+
|     source      |      destination       |      routing_key       |
+-----------------+------------------------+------------------------+
| my_app.incoming | app.pl_incoming_events | pl.app.#               |
| my_app.incoming | other_app              | route                  |
+-----------------+------------------------+------------------------+