Reactor rabbitmq AlreadyClosedException

926 views Asked by At

In my project I am using Springboot version 2.1.2.RELEASE, reactor-rabbitmq version 1.0.0.RELEASE. I am creating rabbit receiver, subscribe it and process messagess with manual ack. But after some time it can be after an hour or after 1-2 days of work, I am getting heartbeat missing error, than channel is closing and I get "com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.IOException: Connection reset by peer" and my receiver no longer receive messages. It works only after restart.

Rabbit client has connectionFactory.setAutomaticRecoveryEnabled(true); and connectionFactory.setTopologyRecoveryEnabled(true);

so it should auto-recovery by default, but it doesn't work.

public void startReceiver(int parallelism) {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.useNio();
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setVirtualHost("/");
    connectionFactory.setRequestedHeartbeat(10);

    Address[] addresses = {new Address("localhost")};
    ReceiverOptions receiverOptions = new ReceiverOptions()
            .connectionFactory(connectionFactory)
            .connectionSupplier(cf -> cf.newConnection(addresses, "receiver"))
            .connectionSubscriptionScheduler(Schedulers.elastic());

    Receiver receiver = RabbitFlux.createReceiver(receiverOptions);
    receiver.consumeManualAck("test-data", new ConsumeOptions().qos(200))
    .doOnSubscribe(s -> System.out.println("Receiver started."))
    .retry()
    .parallel(parallelism)
    .runOn(Schedulers.newParallel("parallel-receiver", parallelism))
    .doOnNext(d -> processMessage(d))
    .subscribe();
}

private void processMessage(AcknowledgableDelivery message) {
    try {
        //some processing
    } catch (Exception e) {
        e.printStackTrace();
    }
    message.ack();
}

I getting errors

com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 10 seconds, at com.rabbitmq.client.impl.AMQConnection.handleHeartbeatFailure(AMQConnection.java:686) ~[amqp-client-5.5.1.jar!/:5.5.1], at com.rabbitmq.client.impl.nio.NioLoop.lambda$handleHeartbeatFailure$0(NioLoop.java:273) [amqp-client-5.5.1.jar!/:5.5.1], at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_181],

reactor.rabbitmq.Receiver : Cancelling consumer amq.ctag--x02OWhVo3_DPutsPQ0qDw consuming from test-data,

reactor.core.Exceptions$ErrorCallbackNotImplemented: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.IOException: Connection reset by peer, Caused by: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.IOException: Connection reset by peer, at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:257) ~[amqp-client-5.5.1.jar!/:5.5.1], at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:426) ~[amqp-client-5.5.1.jar!/:5.5.1], at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:420) ~[amqp-client-5.5.1.jar!/:5.5.1], at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:93) ~[amqp-client-5.5.1.jar!/:5.5.1], at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:428) ~[amqp-client-5.5.1.jar!/:5.5.1], at reactor.rabbitmq.AcknowledgableDelivery.ack(AcknowledgableDelivery.java:56) ~[reactor-rabbitmq-1.0.0.RELEASE.jar!/:1.0.0.RELEASE], at reactor.rabbitmq.AcknowledgableDelivery.ack(AcknowledgableDelivery.java:73) ~[reactor-rabbitmq-1.0.0.RELEASE.jar!/:1.0.0.RELEASE],

0

There are 0 answers