Rabbitmq with Smack connection in Spring boot. Consumer stops if rabbitmq faces error

29 views Asked by At

I have a spring boot application which connects with Smack for sending messages. I initialized RabbitMQConfig like this:

@Configuration
public class RabbitMQConfig {

@Bean
public MessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(jsonMessageConverter());
    return template;
}

}

I created a bean for Smack XMPPConnection

@Configuration
public class XmppConfiguration {
@Bean("xmppConnection")
public XMPPConnection xmppConnectionFactoryBean( ApplicationProperties applicationProperties) throws Exception{
    XMPPTCPConnectionConfiguration config = XMPPTCPConnectionConfiguration.builder()
                    .setUsernameAndPassword(user,password)
.setXmppDomain(domain).setHost(host).setSendPresence(false)
                    .setPort(port).build();
            AbstractXMPPConnection connection = new XMPPTCPConnection(config);
            ParsingExceptionCallback handleUnparseableRecord = null;
            connection.setParsingExceptionCallback(handleUnparseableRecord);
            Roster.getInstanceFor(connection).setRosterLoadedAtLogin(false);
            ReconnectionManager manager = ReconnectionManager.getInstanceFor(connection);

        manager.enableAutomaticReconnection();
        manager.setReconnectionPolicy(ReconnectionManager.ReconnectionPolicy.FIXED_DELAY);
        manager.setFixedDelay(5);
        connection.connect();
        connection.login();
        
        // Connection listeners
        connection.addConnectionListener(new ConnectionListener() {
            @Override
            public void connected(XMPPConnection connection) {
                log.info("admin user connected: {}", connection.isConnected());
            }

            @Override
            public void connectionClosed() {
                log.info("admin user connection closed");
                try {
                    connection.connect();
                    connection.login();
                } catch (SmackException | IOException | XMPPException | InterruptedException e1) {
                    log.error("admin user connection error: {}", e1);
                    Thread.currentThread().interrupt();
                }
            }

            @Override
            public void connectionClosedOnError(Exception e) {
                log.info("admin user connection closed on error: {}", e);
                try {
                    connection.connect();
                    connection.login();
                } catch (SmackException | IOException | XMPPException | InterruptedException e1) {
                    log.error("admin user connection error: {}", e1);
                    Thread.currentThread().interrupt();
                }
                
            }

            @Override
            public void authenticated(XMPPConnection connection, boolean resumed) {
                 log.info("admin user authenticated: {}", connection);
                
            }
            
        });
        return connection;
}

}

When xmpp service is stopped and started - connection disconnects and resumes. But meanwhile rabbitmq stops consuming the messages.

Not sure how to handle this.

Getting this error

    2024-02-21 09:01:32,904 ERROR org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer : Stopping container from aborted consumer
2024-02-21 09:01:32,905 INFO org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer : Waiting for workers to finish.
2024-02-21 09:01:32,905 INFO  `org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer : Successfully waited for workers to finish.

I changed to configure RabbitMQ like this

@Configuration
public class RabbitMQConfig implements RabbitListenerConfigurer {

@Bean
MessageHandlerMethodFactory messageHandlerMethodFactory() {
    DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
    messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
    return messageHandlerMethodFactory;
}

@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
    return new MappingJackson2MessageConverter();
}

public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
    registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}

}

but still doesn't help

0

There are 0 answers