In my project I am using Springboot version 2.1.2.RELEASE, reactor-rabbitmq version 1.0.0.RELEASE. I am creating rabbit receiver, subscribe it and process messagess with manual ack. But after some time it can be after an hour or after 1-2 days of work, I am getting heartbeat missing error, than channel is closing and I get "com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.IOException: Connection reset by peer" and my receiver no longer receive messages. It works only after restart.
Rabbit client has connectionFactory.setAutomaticRecoveryEnabled(true); and connectionFactory.setTopologyRecoveryEnabled(true);
so it should auto-recovery by default, but it doesn't work.
public void startReceiver(int parallelism) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setRequestedHeartbeat(10);
Address[] addresses = {new Address("localhost")};
ReceiverOptions receiverOptions = new ReceiverOptions()
.connectionFactory(connectionFactory)
.connectionSupplier(cf -> cf.newConnection(addresses, "receiver"))
.connectionSubscriptionScheduler(Schedulers.elastic());
Receiver receiver = RabbitFlux.createReceiver(receiverOptions);
receiver.consumeManualAck("test-data", new ConsumeOptions().qos(200))
.doOnSubscribe(s -> System.out.println("Receiver started."))
.retry()
.parallel(parallelism)
.runOn(Schedulers.newParallel("parallel-receiver", parallelism))
.doOnNext(d -> processMessage(d))
.subscribe();
}
private void processMessage(AcknowledgableDelivery message) {
try {
//some processing
} catch (Exception e) {
e.printStackTrace();
}
message.ack();
}
I getting errors
com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 10 seconds, at com.rabbitmq.client.impl.AMQConnection.handleHeartbeatFailure(AMQConnection.java:686) ~[amqp-client-5.5.1.jar!/:5.5.1], at com.rabbitmq.client.impl.nio.NioLoop.lambda$handleHeartbeatFailure$0(NioLoop.java:273) [amqp-client-5.5.1.jar!/:5.5.1], at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_181],
reactor.rabbitmq.Receiver : Cancelling consumer amq.ctag--x02OWhVo3_DPutsPQ0qDw consuming from test-data,
reactor.core.Exceptions$ErrorCallbackNotImplemented: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.IOException: Connection reset by peer, Caused by: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.IOException: Connection reset by peer, at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:257) ~[amqp-client-5.5.1.jar!/:5.5.1], at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:426) ~[amqp-client-5.5.1.jar!/:5.5.1], at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:420) ~[amqp-client-5.5.1.jar!/:5.5.1], at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:93) ~[amqp-client-5.5.1.jar!/:5.5.1], at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:428) ~[amqp-client-5.5.1.jar!/:5.5.1], at reactor.rabbitmq.AcknowledgableDelivery.ack(AcknowledgableDelivery.java:56) ~[reactor-rabbitmq-1.0.0.RELEASE.jar!/:1.0.0.RELEASE], at reactor.rabbitmq.AcknowledgableDelivery.ack(AcknowledgableDelivery.java:73) ~[reactor-rabbitmq-1.0.0.RELEASE.jar!/:1.0.0.RELEASE],