Does Spring Rabbitmq StreamListenerContainer support Jackson2JsonMessageConverter

50 views Asked by At

I have a producer that publish message to Rabbitmq 3.12 stream, where the producer publishing class Invoice. In the consumer, I have this configuration.

@Configuration
public class RabbitmqStreamJsonConfig {

  @Bean
  ObjectMapper objectMapper() {
    return JsonMapper.builder().findAndAddModules().build();
  }

  @Bean
  Jackson2JsonMessageConverter converter(@Autowired ObjectMapper objectMapper) {
    return new Jackson2JsonMessageConverter(objectMapper);
  }

  @Bean
  RabbitListenerContainerFactory<StreamListenerContainer> invoiceContainerFactory(Environment env) {
    var factory = new StreamRabbitListenerContainerFactory(env);
    factory.setNativeListener(false);
    factory.setConsumerCustomizer((id, builder) -> {
      builder.name("invoice-consumer").offset(OffsetSpecification.first()).singleActiveConsumer()
          .autoTrackingStrategy();
    });
    return factory;
  }
}

And this is the consumer class.

@Service
public class InvoiceConsumer {

  private static final Logger LOG = LoggerFactory.getLogger(InvoiceConsumer.class);

  @RabbitListener(queues = "s.invoice")
  void listenDefault(Invoice str) throws JsonMappingException, JsonProcessingException {
    LOG.info("listenDefault : {}", str);
  }

  @RabbitListener(queues = "s.invoice", containerFactory = "invoiceContainerFactory")
  void listenWithContainerFactory(Invoice str) throws JsonMappingException, JsonProcessingException {
    LOG.info("listenWithContainerFactory : {}", str);
  }

}

The listenDefault method works as expected: the received message directly converted to class Invoice and displayed on console. However, the listenWithContainerFactory always throws exception

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [void com.rabbitmq.stream.consumer.InvoiceConsumer.listenWithContainerFactory(com.rabbitmq.stream.entity.Invoice) throws com.fasterxml.jackson.databind.JsonMappingException,com.fasterxml.jackson.core.JsonProcessingException]
Bean [com.rabbitmq.stream.consumer.InvoiceConsumer@46b7195f]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:281) ~[spring-rabbit-3.0.10.jar:3.0.10]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:224) ~[spring-rabbit-3.0.10.jar:3.0.10]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:149) ~[spring-rabbit-3.0.10.jar:3.0.10]
    at org.springframework.rabbit.stream.listener.StreamListenerContainer.lambda$setupMessageListener$5(StreamListenerContainer.java:304) ~[spring-rabbit-stream-3.0.10.jar:3.0.10]
    at io.micrometer.observation.Observation.lambda$observe$0(Observation.java:493) ~[micrometer-observation-1.11.5.jar:1.11.5]
    at io.micrometer.observation.Observation.observeWithContext(Observation.java:603) ~[micrometer-observation-1.11.5.jar:1.11.5]
    at io.micrometer.observation.Observation.observe(Observation.java:492) ~[micrometer-observation-1.11.5.jar:1.11.5]
    at org.springframework.rabbit.stream.listener.StreamListenerContainer.lambda$setupMessageListener$7(StreamListenerContainer.java:302) ~[spring-rabbit-stream-3.0.10.jar:3.0.10]
    at com.rabbitmq.stream.impl.StreamConsumer.lambda$new$0(StreamConsumer.java:112) ~[stream-client-0.9.0.jar:0.9.0]
    at com.rabbitmq.stream.impl.StreamConsumer.lambda$new$2(StreamConsumer.java:137) ~[stream-client-0.9.0.jar:0.9.0]
    at com.rabbitmq.stream.impl.StreamConsumer.lambda$new$7(StreamConsumer.java:240) ~[stream-client-0.9.0.jar:0.9.0]
    at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.lambda$new$3(ConsumersCoordinator.java:572) ~[stream-client-0.9.0.jar:0.9.0]
    at com.rabbitmq.stream.impl.ServerFrameHandler$DeliverVersion1FrameHandler.handleMessage(ServerFrameHandler.java:348) ~[stream-client-0.9.0.jar:0.9.0]
    at com.rabbitmq.stream.impl.ServerFrameHandler$DeliverVersion1FrameHandler.handleDeliver(ServerFrameHandler.java:475) ~[stream-client-0.9.0.jar:0.9.0]
    at com.rabbitmq.stream.impl.ServerFrameHandler$DeliverVersion2FrameHandler.doHandle(ServerFrameHandler.java:596) ~[stream-client-0.9.0.jar:0.9.0]
    at com.rabbitmq.stream.impl.ServerFrameHandler$BaseFrameHandler.handle(ServerFrameHandler.java:262) ~[stream-client-0.9.0.jar:0.9.0]
    at com.rabbitmq.stream.impl.Client$StreamHandler.lambda$channelRead$1(Client.java:2526) ~[stream-client-0.9.0.jar:0.9.0]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.rabbitmq.stream.entity.Invoice] for GenericMessage [payload=byte[79], headers={amqp_expiration=0, amqp_contentEncoding=UTF-8, id=4ad70fcd-ab31-df11-994e-97870c17f1c6, amqp_lastInBatch=false, contentType=application/json, __TypeId__=com.rabbitmq.stream.entity.Invoice, timestamp=1699113082704}]
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:151) ~[spring-messaging-6.0.13.jar:6.0.13]
    at org.springframework.amqp.rabbit.listener.adapter.AmqpMessageHandlerMethodFactory$OptionalEmptyAwarePayloadArgumentResolver.resolveArgument(AmqpMessageHandlerMethodFactory.java:99) ~[spring-rabbit-3.0.10.jar:3.0.10]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.0.13.jar:6.0.13]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.0.13.jar:6.0.13]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.0.13.jar:6.0.13]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:75) ~[spring-rabbit-3.0.10.jar:3.0.10]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:277) ~[spring-rabbit-3.0.10.jar:3.0.10]
    ... 21 common frames omitted

I think this is because my RabbitListenerContainerFactory<StreamListenerContainer> invoiceContainerFactory from configuration above does not inject any MessageConverter (should be Jackson2JsonMessageConverter, since the listenDefault works). But I can't find any method on the StreamRabbitListenerContainerFactory to inject the message converter. Does RabbitListenerContainerFactory<StreamListenerContainer> does not support JSON conversion?

0

There are 0 answers