Spring Cloud Stream routing-expression for a dedicated Binding

1.9k views Asked by At

I would like to have a binding that connects to a specific queue/topic and routes to the right function based on a specific header entry.

I could not find any example for this case. I have tried several approaches, but with none of them I had success.

This for example didn't work:

spring:
  cloud:
    function:
      routing:
        enabled: true
    stream:
      function:
        routing:
          enabled: true
        definition: myConsumer;myOtherConsumer;
        bindings:
          myConsumer-in-0:
            destination: myTopic
            group:  myGroup
            binder: myBroker
            routing-expression: "headers['MyRoutingInfo'] == 'even' ? 'myEvenConsumer' : 'myOddConsumer'"
          myOtherConsumer-in-0: #without specific routing

Every concrete example is appreciated

4

There are 4 answers

0
MikeR13 On

I finally found a way to achieve my goal. But I'm not sure wheter this is THE way to do it:

    spring:
      cloud:
        function:
          routing:
            enabled: true
          routing-expression: "headers['MyRouting'] == 'odd' ? 'oddConsumer' : 'evenConsumer'"
        stream:
          function:
            definition: myConsumer;myOtherConsumer;
            bindings:
              myConsumer-in-0:
                destination: myTopic
                group:  myGroup
                binder: myBroker
              myOtherConsumer-in-0: #without specific routing

with the following beans:

@Bean
public Consumer<Message<byte[]>> myConsumer(final RoutingFunction routingFunction) {
        return message -> {
           LOG.info("Sending to routingFunction");
           routingFunction.apply(message);
        };
}

@Bean
public Consumer<byte[]> evenConsumer() {
      return (payload) -> LOG.info("even got: {}", new String(payload));
}

@Bean
public Consumer<byte[]> oddConsumer() {
    return (payload) -> LOG.info("odd got: {}", new String(payload));
}
2
Gary Russell On

Consumers don't "route" messages, they consume from queues. Producers route messages using s.c.s.rabbit.bindings.producer-out-0.producer.routing-key-expression.

0
VaibS On

To enable routing, by default the binding with name functionRouter would be created.

As per docs:

The RoutingFunction is registered in FunctionCatalog under the name functionRouter. For simplicity and consistency you can also refer to RoutingFunction.FUNCTION_NAME constant.

Below config should work fine:

spring:
  cloud:
    stream:
      function:
        definition: functionRouter;
        routing:
          enabled: true
      kafka:
        binder:
          brokers:
            - localhost:9092
      bindings:
        functionRouter-in-0:
          destination: my.topic
          group: my.topic.group
    function:
      routing-expression: "headers['type'] == 'even' ? 'evenConsumer' : 'oddConsumer'"

You don't need to create even and odd consumer function definition.

0
Terrak Dev On

You actually do not need to provide the spring.cloud.stream.function.routing.enabled=true parameter in application.properties file in order to make routing work, because it automatically works as soon as you provide the routing-expression parameter - see: spring cloud stream documentation