How to bind Spring cloud stream binding for producer and consumer?

1.8k views Asked by At

I am new to Spring cloud and RabbitMQ. I am trying to produce and consume messages from Rabbit MQ. I am having difficulty in configuring exchange name and queue name for producer and consumer.

I want to connect to the existing exchange called to order and the existing queue called myQueue

Below is the application.properties

spring.rabbitmq.addresses=amqp://user:pass@localhost:5672/
spring.cloud.stream.function.bindings.processTable-out-0=order

spring.cloud.stream.function.bindings.processTable-in-0=order
spring.cloud.stream.bindings.order.group=myQueue
spring.cloud.stream.bindings.order.content-type=application/json

Above configuration is connecting to order exchange, however, it is creating and connecting a new queue called order.myQueue.

The consumer method is below.

  @Bean
  public Consumer<String> processTable(){
      log.info("Conuming message......................");
      Consumer<String> consumer = (request)-> System.out.println(request);
      System.out.println(consumer);
      return consumer;
  }

What do I need to tweak in application.properties to connect to order exchange and myOrder queue?

2

There are 2 answers

0
Gary Russell On BEST ANSWER

See Using Existing Queues/Exchanges.

If you have an existing exchange/queue that you wish to use, you can completely disable automatic provisioning as follows, assuming the exchange is named myExchange and the queue is named myQueue:

  • spring.cloud.stream.bindings..destination=myExchange

  • spring.cloud.stream.bindings..group=myQueue

  • spring.cloud.stream.rabbit.bindings..consumer.bindQueue=false

  • spring.cloud.stream.rabbit.bindings..consumer.declareExchange=false

  • spring.cloud.stream.rabbit.bindings..consumer.queueNameGroupOnly=true

0
Musaddique S On

I am going to write the new way of rabbitmq integration in spring boot with stream.

there are 3 types of request and response via rabbitmq

  1. you send something to rabbitmq and expect some response from it -> Function
  2. you send message to rabbitmq and expect nothing -> Producer
  3. you consume message from rabbitmq -> Consumer

#1. I am writing example of Function which will work as both producer and consumer

    @Component
    public class RequestResponseFunction implements Function {
        @Override
        public ResponseMessageapply(RequestMessage request) {
            //code logic to process request and return Response type object
          return response;
        }
    }

#2. Producer :

@Component
@Slf4j
public class MessageDispacher implements Supplier>> {
    private Sinks.Many> sink = Sinks.many().unicast().onBackpressureBuffer();

    @Override
    public Flux> get() {
        return sink.asFlux()
                .doOnNext(m -> log.info("Manually sending message {}", m))
                .doOnError(t -> log.error("Error encountered", t));
    }

    public void dispatch(MyMessage myMessage){
        Message message = MessageBuilder.withPayload(myMessage).build();
        sink.emitNext(message, Sinks.EmitFailureHandler.FAIL_FAST);
    }
}

#3. Consumer :

@Component
public class RequestConsumer implements Consumer {
    @Override
    public void accept(RequestMessage requestMessage) {
       //code logic to process request 
    }
}

application.yaml

spring:
  rabbitmq:
    connection-timeout: 5s
    host: "${RABBITMQ_HOST}"
    username: "${RABBITMQ_USERNAME}"
    password: "${RABBITMQ_PASSWORD}"
    port: 5672
  cloud:
    function:
      definition: responseFunction;messageDispacher;requestConsumer
    stream:
      bindings:
        responseFunction-in-0:
          destination: "myExchange"
          group: "${spring.application.name}"
          content-type: "application/json"
        responseFunction-out-0:
          destination: "myExchange"
          content-type: "application/json"
        messageDispacher-out-0:
          destination: "myExchange"
          content-type: "application/json"
        requestConsumer-in-0:
          destination: "myExchange"
          content-type: "application/json"
          group: "${spring.application.name}"