Spring Boot Consumer Class should be producer as well/ rabbitmq

44 views Asked by At

i am using rabbitmq and springBoot. I have a consumer that is currently listening to one queue and should send a new message to another queue, depending on the received message. Thats my Problem. I hope you can help me. I am not sending the Messages via Rest but directly with the convertAndSend Method. The receiving of the messages works fine.

Consumer:

import org.json.JSONException;
import org.json.JSONObject;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.server.ConfigurableWebServerFactory;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableRabbit
public class SpringConsumer {

    private static final String QUEUE_NAME = "spring-boot4";
    private static final String OTHER_QUEUE_NAME = "resonse_queue";

    public static void main(String[] args) {
        SpringApplication.run(SpringConsumer.class, args);
    }


    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(String message) {
        try {
            JSONObject json = new JSONObject(message);
            if ((boolean) json.get("cool")) {
                System.out.println("Neue 'wahre' Nachricht empfangen: " + json.toString());
                JSONObject responseJson = new JSONObject();
                responseJson.put("response", "success");
                //Here the response should be send
            } else {
                System.out.println("Neue 'falsche' Nachricht empfangen: " + json.toString());
            }

        } catch (JSONException e) {
            e.printStackTrace();
        }
    }
    

    @Bean
    public WebServerFactoryCustomizer<ConfigurableWebServerFactory> webServerFactoryCustomizer() {
        return factory -> factory.setPort(9090); // Hier wird der Port auf 9090 gesetzt
    }
}

And my Producer:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class MessagingRabbitmqApplication {

    static final String topicExchangeName = "spring-boot-exchange";

    static final String queueName = "spring-boot4";

    @Bean
    Queue queue() {
        return new Queue(queueName, true);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(topicExchangeName);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        container.setAutoStartup(false); // Set autoStartup to false
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(MessagingRabbitmqApplication.class, args).close();
    }

}

Thanks for youre help in advance Paul

1

There are 1 answers

1
lord rufusus On

So the following does not work

@SpringBootApplication
@EnableRabbit
public class SpringConsumer {

    private static final String QUEUE_NAME = "spring-boot4";
    private static final String OTHER_QUEUE_NAME = "other-queue";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public static void main(String[] args) {
        SpringApplication.run(SpringConsumer.class, args);
    }

    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(String message) {
        try {
            JSONObject json = new JSONObject(message);
            if ((boolean) json.get("cool")) {
                System.out.println("Neue 'wahre' Nachricht empfangen: " + json.toString());
                // Beispiel: Sende eine neue Nachricht an eine andere Queue
                JSONObject responseJson = new JSONObject();
                responseJson.put("response", "success");
                rabbitTemplate.convertAndSend(OTHER_QUEUE_NAME, responseJson.toString());
            } else {
                System.out.println("Neue 'falsche' Nachricht empfangen: " + json.toString());
            }
        } catch (JSONException e) {
            e.printStackTrace();
        }
    }

    @Bean
    public WebServerFactoryCustomizer<ConfigurableWebServerFactory> webServerFactoryCustomizer() {
        return factory -> factory.setPort(9090);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}

It says:

APPLICATION FAILED TO START


Description:

The dependencies of some of the beans in the application context form a cycle:

tomcatServletWebServerFactory defined in class path resource [org/springframework/boot/autoconfigure/web/servlet/ServletWebServerFactoryConfiguration$EmbeddedTomcat.class] ┌─────┐ | springConsumer (field private org.springframework.amqp.rabbit.core.RabbitTemplate SpringConsumerResponder_akaDirk.SpringConsumer.rabbitTemplate) └─────┘

Action:

Relying upon circular references is discouraged and they are prohibited by default. Update your application to remove the dependency cycle between beans. As a last resort, it may be possible to break the cycle automatically by setting spring.main.allow-circular-references to true.