Client hangs on sending messages to RabbitMQ with Java client

596 views Asked by At

I've implemented RabbitMQ publisher and consumer in reactive manner with Java, but my publishing functionality hangs channel. The queue itself, declaring a queue and consuming however works fine, I've tested it with admin's management UI. When attempting to send more messages I don't see any more of logs like "queue declare success" or "delivering message to exchange...". By the way I know I do not need declareQueue in deliver(), but I added it to verify if communication in this particular channel works.

My code is:

@Slf4j
@Component
public class RabbitConfigurator {
    private TasksQueueConfig cfg;
    private ReceiverOptions recOpts;
    private List<Address> addresses;
    private Utils.ExceptionFunction<ConnectionFactory, Connection> connSupplier;

    public RabbitConfigurator(TasksQueueConfig cfg) {
        this.cfg = cfg;
        addresses = cfg
                .getHosts()
                .stream()
                .map(Address::new)
                .collect(Collectors.toList());
        connSupplier = cf -> {
            LOG.info("initializing new RabbitMQ connection");
            return cf.newConnection(addresses, "dmTasksProc");
        };
    }

    @Bean
    public ConnectionFactory rabbitMQConnectionFactory() {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost(cfg.getHosts().get(0));
        cf.setPort(5672);
        cf.setUsername(cfg.getUsername());
        cf.setPassword(cfg.getPassword());
        return cf;
    }

    @Bean
    public Sender sender(ConnectionFactory connFactory) {
        SenderOptions sendOpts = new SenderOptions()
                .connectionClosingTimeout(Duration.parse(cfg.getConnectionTimeout()))
                .connectionFactory(connFactory)
                .connectionSupplier(connSupplier)
                .connectionSubscriptionScheduler(Schedulers.elastic());
        return RabbitFlux.createSender(sendOpts);
    }

    @Bean
    public Receiver receiver(ConnectionFactory connFactory) {
        ReceiverOptions recOpts = new ReceiverOptions()
                .connectionClosingTimeout(Duration.parse(cfg.getConnectionTimeout()))
                .connectionFactory(connFactory)
                .connectionSupplier(connSupplier)
                .connectionSubscriptionScheduler(Schedulers.elastic());
        return RabbitFlux.createReceiver(recOpts);
    }

    @Bean
    public Flux<Delivery> deliveryFlux(Receiver receiver) {
        return receiver.consumeAutoAck(cfg.getName(), new ConsumeOptions().qos(cfg.getPrefetchCount()));
    }

    @Bean
    public AmqpAdmin rabbitAmqpAdmin(ConnectionFactory connFactory) {
        return new RabbitAdmin(new CachingConnectionFactory(connFactory));
    }
}

and the consumer/publisher:

@Slf4j
@Service
public class TasksQueue implements DisposableBean {
    private TasksQueueConfig cfg;
    private ObjectMapper mapper;
    private Flux<Delivery> deliveryFlux;
    private Receiver receiver;
    private Sender sender;

    private Disposable consumer;

    public TasksQueue(TasksQueueConfig cfg, AmqpAdmin amqpAdmin, ObjectMapper mapper, Flux<Delivery> deliveryFlux,
                         Receiver receiver, Sender sender) {
        this.cfg = cfg;
        this.mapper = mapper;
        this.deliveryFlux = deliveryFlux;
        this.receiver = receiver;
        this.sender = sender;

        amqpAdmin.declareQueue(new Queue(cfg.getName(), false, false, false));
        consumer = consume();
    }

    public Mono<Void> deliver(Flux<Task> tasks) {
        var pub = sender.sendWithPublishConfirms(
                        tasks.map(task -> {
                            try {
                                String exchange = "";
                                LOG.debug("delivering message to exchange='{}', routingKey='{}'", exchange, cfg.getName());
                                return new OutboundMessage(exchange, cfg.getName(), mapper.writeValueAsBytes(task));
                            } catch(JsonProcessingException ex) {
                                throw Exceptions.propagate(ex);
                            }
                        }));

        return sender.declareQueue(QueueSpecification.queue(cfg.getName()))
                .flatMap(declareOk -> {
                    LOG.info("queue declare success");
                    return Mono.just(declareOk);
                })
                .thenMany(pub)
                .doOnError(JsonProcessingException.class, ex -> LOG.error("Cannot prepare queue message:", ex))
                .doOnError(ex -> LOG.error("Failed to send task to the queue:", ex))
                .map(res -> {
                    if(res.isAck()) {
                        LOG.info("Message {} sent successfully", new String(res.getOutboundMessage().getBody()));
                        return res;
                    } else {
                        LOG.info("todo");
                        return res;
                    }
                })
                .then();
    }

    private Disposable consume() {
        return deliveryFlux
                .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))
                .doOnError(err -> {
                    LOG.error("tasks consumer error", err);
                })
                .subscribe(m -> {
                    LOG.info("Received message {}", new String(m.getBody()));
                });
    }

    @Override
    public void destroy() throws Exception {
        LOG.info("Cleaning up tasks queue resources");
        consumer.dispose();
        receiver.close();
        sender.close();
    }
}

Five minutes after attempting to send message I get log:

r.r.ChannelCloseHandlers$SenderChannelCloseHandler:47: closing channel 1 by signal cancel
r.r.ChannelCloseHandlers$SenderChannelCloseHandler:53: Channel 1 didn't close normally: null

Big thanks for input in advance!

0

There are 0 answers