Auto-delete queue not recreated after broker outage

2.4k views Asked by At

I have a question regarding the use of spring-amqp through XML wiring and through creation of the various objects in java code.

Doing this via XML and via Java code is pretty simple and has been working well for me in most cases. However, after an outage of the broker (simulated by shutting down and restarting my broker) I get two different results.

When using XML wiring to create the spring-amqp objects, everything works fine. The connection is re-established, the queue is recreated and reception of messages resumes.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:ctx="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
                        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
                        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

    <rabbit:connection-factory id="connectionFactory" 
        addresses="192.168.1.10:5672" 
        username="guest" 
        password="guest" 
    />

    <rabbit:admin connection-factory="connectionFactory"/> 

    <rabbit:queue name="testQueue" id="testQueue" auto-declare="true" auto-delete="true" exclusive="false"/>

    <rabbit:fanout-exchange name="testExchange" id="testExchange" >
        <rabbit:bindings>
            <rabbit:binding queue="testQueue" />
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <bean class="TestConsumer" id="testConsumer" />

    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="testQueue" ref="testConsumer" method="inbound" />
    </rabbit:listener-container>

</beans>

However, when using Java to create the spring-amqp objects, I run into issues. The connection is re-established, but then I immediately hit exceptions reporting that the queue is not found. The admin does not attempt to re-create the queue and after a few exceptions, the listener container is stopped.

public static void main(String[] args) {

    CachingConnectionFactory cf = new CachingConnectionFactory("192.168.1.10", 5672);

    RabbitAdmin admin = new RabbitAdmin(cf);

    FanoutExchange testExchange = new FanoutExchange("testExchange", true, false);
    admin.declareExchange(testExchange);

    Queue testQueue = new Queue("testQueue", true, false, true);
    admin.declareQueue(testQueue);

    admin.declareBinding(BindingBuilder.bind(testQueue).to(testExchange));

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(cf);
    container.setRabbitAdmin(admin);

    container.setQueues(testQueue);
    container.setMessageListener(new MessageListenerAdapter() {
        public void handleMessage(String text) {
            System.out.println("Received : " + text);
        }
    });
    container.afterPropertiesSet();

    container.start();

    try {
        Thread.sleep(600000L);
    } catch(Exception e) {
        e.printStackTrace();
    }

    container.stop();
    container.destroy();

    System.out.println("Exiting");
}

This is the exception that I see (three or four times) after the connection is re-established and before the listener container exits:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'testQueue' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)

I would assume that the XML wiring case and the pure Java case should work equivalently, but clearly this is not the case. I am not sure if there is something that I am missing in my Java case, or whether this is just not supported.

I can of course detect the failure of the listener container and use the admin to re-declare the queues, but seeing as this is not necessary with the XML wired example, I am wondering what I might be missing.

Using org.springframework.amqp.spring-rabbit 1.4.5.RELEASE

2

There are 2 answers

2
Gary Russell On BEST ANSWER

The admin needs a Spring Application Context to automatically declare the elements (queues etc).

It registers a connection listener with the connection factory and, when a new connection is established, it looks for all the queues etc in the application context and registers them.

Since you are using "pure" java (no Spring Application Context), this mechanism can't work.

You can either use Spring Java Configuration to replace your XML (using @Bean definitions), or you will have to register your own class with the connection factory to perform the declarations.

See the admin source code for how he registers the listener and the initialize method for how he does the declarations.

You would simply need to do your

FanoutExchange testExchange = new FanoutExchange("testExchange", true, false);
admin.declareExchange(testExchange);

Queue testQueue = new Queue("testQueue", true, false, true);
admin.declareQueue(testQueue);

within the listener.

0
R. Oosterholt On

Spring AMQP only reconnects queues and exchanges registered using the @Bean annotation.
Queues and Exchanges registered directly by the RabbitAdmin will not be reconnected automatically. You need to set resetAllManualDeclarations() to true before declaring them.

Example:

RabbitAdmin admin = new RabbitAdmin(cf);

// enable auto reconnect when connection fails
admin.resetAllManualDeclarations(true);

FanoutExchange testExchange = new FanoutExchange("testExchange", true, false);
admin.declareExchange(testExchange);

Queue testQueue = new Queue("testQueue", true, false, true);
admin.declareQueue(testQueue);

Note: you need spring-rabbit 2.4+.