Embedded AMQP Java Broker

27.3k views Asked by At

I am trying to create integration test for a Scala / Java application that connects to a RabbitMQ broker. To achieve this I would like an embedded broker that speaks AMQP that I start and stop before each test. Originally I tried to introduce ActiveMQ as an embedded broker with AMQP however the application uses RabbitMQ so only speaks AMQP version 0.9.3 whereas ActiveMQ requires AMQP version 1.0.

Is there another embedded broker I can use in place of ActiveMQ?

5

There are 5 answers

8
OrangeDog On BEST ANSWER

A completely in-memory solution. Replace the spring.* properties as required.

<dependency>
  <groupId>org.apache.qpid</groupId>
  <artifactId>qpid-broker</artifactId>
  <version>6.1.1</version>
  <scope>test</scope>
</dependency>
public class EmbeddedBroker {
  public void start() {
    Broker broker = new Broker();
    BrokerOptions brokerOptions = new BrokerOptions();
    brokerOptions.setConfigProperty("qpid.amqp_port", environment.getProperty("spring.rabbitmq.port"));
    brokerOptions.setConfigProperty("qpid.broker.defaultPreferenceStoreAttributes", "{\"type\": \"Noop\"}");
    brokerOptions.setConfigProperty("qpid.vhost", environment.getProperty("spring.rabbitmq.virtual-host"));
    brokerOptions.setConfigurationStoreType("Memory");
    brokerOptions.setStartupLoggedToSystemOut(false);
    broker.startup(brokerOptions);
  }
}

Add initial-config.json as a resource:

{
  "name": "Embedded Test Broker",
  "modelVersion": "6.1",
  "authenticationproviders" : [{
    "name": "password",
    "type": "Plain",
    "secureOnlyMechanisms": [],
    "users": [{"name": "guest", "password": "guest", "type": "managed"}]
  }],
  "ports": [{
    "name": "AMQP",
    "port": "${qpid.amqp_port}",
    "authenticationProvider": "password",
    "protocols": [ "AMQP_0_9_1" ],
    "transports": [ "TCP" ],
    "virtualhostaliases": [{
      "name": "${qpid.vhost}",
      "type": "nameAlias"
    }]
  }],
  "virtualhostnodes" : [{
    "name": "${qpid.vhost}",
    "type": "Memory",
    "virtualHostInitialConfiguration": "{ \"type\": \"Memory\" }"
  }]
}
0
Arnost Valicek On

You can try Apache QPid Java broker. This can be used as embedded broker.

Setup in Scala is described in another SO question - Example of standalone Apache Qpid (amqp) Junit Test

0
Ian Dallas On

I'm not aware of any embedded RabbitMQ servers so I think you have a couple options to workaround this:

  1. Your RabbitMQ server does not need to exist on your CI server, you can bring up a new server that is your CI rabbitmq server. If you can't bring one up yourself you could look into CloudAMQP. The free tier today offers: 1M messages per month, 20 concurrent connections, 100 queues, 10,000 queued messages. Could be enough for your CI process.

  2. If your testing is only being done unit tests for RabbitMQ you could mock out your RabbitMQ message production. This is what we do in some of our unit tests. We just check that a certain operation make the method call to produce a specific message, but we mock this out so we don't actually publish a message. Then we test each of the consumers by explicitly calling the consumer methods with a specific message we created.

0
Timi On

Here's the solution proposed by OrangeDog adapted to Qpid Broker 7.x, inspired from here:

Add qpid 7.x as test dependecies. In 7.x these have been separated in core + plugins, depending on what you need. For RabbitMQ AMQP version you'll need qpid-broker-plugins-amqp-0-8-protocol and for running in-memory (sufficient for integration tests) use qpid-broker-plugins-memory-store.

pom.xml:

...
<properties>
    ...
    <qpid-broker.version>7.0.2</qpid-broker.version>
</properties>

<dependencies>
    ...
    <dependency>
        <groupId>org.apache.qpid</groupId>
        <artifactId>qpid-broker-core</artifactId>
        <version>${qpid-broker.version}</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.qpid</groupId>
        <artifactId>qpid-broker-plugins-amqp-0-8-protocol</artifactId>
        <version>${qpid-broker.version}</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.qpid</groupId>
        <artifactId>qpid-broker-plugins-memory-store</artifactId>
        <version>${qpid-broker.version}</version>
        <scope>test</scope>
    </dependency>
</dependecies>
...

Add broker configuration with hard-coded user/password and default in-memory virtual host mapped to default port (5672):

qpid-config.json:

{
  "name": "EmbeddedBroker",
  "modelVersion": "7.0",
  "authenticationproviders": [
    {
      "name": "password",
      "type": "Plain",
      "secureOnlyMechanisms": [],
      "users": [{"name": "guest", "password": "guest", "type": "managed"}]
    }
  ],
  "ports": [
    {
      "name": "AMQP",
      "port": "${qpid.amqp_port}",
      "authenticationProvider": "password",
      "virtualhostaliases": [
        {
          "name": "defaultAlias",
          "type": "defaultAlias"
        }
      ]
    }
  ],
  "virtualhostnodes": [
    {
      "name": "default",
      "defaultVirtualHostNode": "true",
      "type": "Memory",
      "virtualHostInitialConfiguration": "{\"type\": \"Memory\" }"
    }
  ]
}

Define junit ExternalResource and declare as ClassRule (or start and close your embedded broker in your IT @BeforeClass and @AfterClass methods):

EmbeddedAMQPBroker.java:

public class EmbeddedAMQPBroker extends ExternalResource {

    private final SystemLauncher broker = new SystemLauncher();

    @Override
    protected void before() throws Throwable {
        startQpidBroker();
        //createExchange();
    }

    @Override
    protected void after() {
        broker.shutdown();
    }

    private void startQpidBroker() throws Exception {
        Map<String, Object> attributes = new HashMap<>();
        attributes.put("type", "Memory");
        attributes.put("initialConfigurationLocation", findResourcePath("qpid-config.json"));
        broker.startup(attributes);
    }

    private String findResourcePath(final String fileName) {
        return EmbeddedAMQPBroker.class.getClassLoader().getResource(fileName).toExternalForm();
    }
}

Integration test:

public class MessagingIT{
    @ClassRule
    public static EmbeddedAMQPBroker embeddedAMQPBroker = new EmbeddedAMQPBroker();

    ...
}
1
Alejandro On

I've developed a wrapper around the process of downloading, extracting, starting and managing RabbitMQ so it can work like an embedded service controlled by any JVM project.

Check it out: https://github.com/AlejandroRivera/embedded-rabbitmq

It's as simple as:

EmbeddedRabbitMqConfig config = new EmbeddedRabbitMqConfig.Builder()
    .version(PredefinedVersion.V3_5_7)
    .build();
EmbeddedRabbitMq rabbitMq = new EmbeddedRabbitMq(config);
rabbitMq.start();
...
rabbitMq.stop();

Works on Linux, Mac and Windows.