I have a producer that publish message to Rabbitmq 3.12 stream, where the producer publishing class Invoice.
In the consumer, I have this configuration.
@Configuration
public class RabbitmqStreamJsonConfig {
@Bean
ObjectMapper objectMapper() {
return JsonMapper.builder().findAndAddModules().build();
}
@Bean
Jackson2JsonMessageConverter converter(@Autowired ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> invoiceContainerFactory(Environment env) {
var factory = new StreamRabbitListenerContainerFactory(env);
factory.setNativeListener(false);
factory.setConsumerCustomizer((id, builder) -> {
builder.name("invoice-consumer").offset(OffsetSpecification.first()).singleActiveConsumer()
.autoTrackingStrategy();
});
return factory;
}
}
And this is the consumer class.
@Service
public class InvoiceConsumer {
private static final Logger LOG = LoggerFactory.getLogger(InvoiceConsumer.class);
@RabbitListener(queues = "s.invoice")
void listenDefault(Invoice str) throws JsonMappingException, JsonProcessingException {
LOG.info("listenDefault : {}", str);
}
@RabbitListener(queues = "s.invoice", containerFactory = "invoiceContainerFactory")
void listenWithContainerFactory(Invoice str) throws JsonMappingException, JsonProcessingException {
LOG.info("listenWithContainerFactory : {}", str);
}
}
The listenDefault method works as expected: the received message directly converted to class Invoice and displayed on console.
However, the listenWithContainerFactory always throws exception
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [void com.rabbitmq.stream.consumer.InvoiceConsumer.listenWithContainerFactory(com.rabbitmq.stream.entity.Invoice) throws com.fasterxml.jackson.databind.JsonMappingException,com.fasterxml.jackson.core.JsonProcessingException]
Bean [com.rabbitmq.stream.consumer.InvoiceConsumer@46b7195f]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:281) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:224) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:149) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.rabbit.stream.listener.StreamListenerContainer.lambda$setupMessageListener$5(StreamListenerContainer.java:304) ~[spring-rabbit-stream-3.0.10.jar:3.0.10]
at io.micrometer.observation.Observation.lambda$observe$0(Observation.java:493) ~[micrometer-observation-1.11.5.jar:1.11.5]
at io.micrometer.observation.Observation.observeWithContext(Observation.java:603) ~[micrometer-observation-1.11.5.jar:1.11.5]
at io.micrometer.observation.Observation.observe(Observation.java:492) ~[micrometer-observation-1.11.5.jar:1.11.5]
at org.springframework.rabbit.stream.listener.StreamListenerContainer.lambda$setupMessageListener$7(StreamListenerContainer.java:302) ~[spring-rabbit-stream-3.0.10.jar:3.0.10]
at com.rabbitmq.stream.impl.StreamConsumer.lambda$new$0(StreamConsumer.java:112) ~[stream-client-0.9.0.jar:0.9.0]
at com.rabbitmq.stream.impl.StreamConsumer.lambda$new$2(StreamConsumer.java:137) ~[stream-client-0.9.0.jar:0.9.0]
at com.rabbitmq.stream.impl.StreamConsumer.lambda$new$7(StreamConsumer.java:240) ~[stream-client-0.9.0.jar:0.9.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.lambda$new$3(ConsumersCoordinator.java:572) ~[stream-client-0.9.0.jar:0.9.0]
at com.rabbitmq.stream.impl.ServerFrameHandler$DeliverVersion1FrameHandler.handleMessage(ServerFrameHandler.java:348) ~[stream-client-0.9.0.jar:0.9.0]
at com.rabbitmq.stream.impl.ServerFrameHandler$DeliverVersion1FrameHandler.handleDeliver(ServerFrameHandler.java:475) ~[stream-client-0.9.0.jar:0.9.0]
at com.rabbitmq.stream.impl.ServerFrameHandler$DeliverVersion2FrameHandler.doHandle(ServerFrameHandler.java:596) ~[stream-client-0.9.0.jar:0.9.0]
at com.rabbitmq.stream.impl.ServerFrameHandler$BaseFrameHandler.handle(ServerFrameHandler.java:262) ~[stream-client-0.9.0.jar:0.9.0]
at com.rabbitmq.stream.impl.Client$StreamHandler.lambda$channelRead$1(Client.java:2526) ~[stream-client-0.9.0.jar:0.9.0]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.rabbitmq.stream.entity.Invoice] for GenericMessage [payload=byte[79], headers={amqp_expiration=0, amqp_contentEncoding=UTF-8, id=4ad70fcd-ab31-df11-994e-97870c17f1c6, amqp_lastInBatch=false, contentType=application/json, __TypeId__=com.rabbitmq.stream.entity.Invoice, timestamp=1699113082704}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:151) ~[spring-messaging-6.0.13.jar:6.0.13]
at org.springframework.amqp.rabbit.listener.adapter.AmqpMessageHandlerMethodFactory$OptionalEmptyAwarePayloadArgumentResolver.resolveArgument(AmqpMessageHandlerMethodFactory.java:99) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.0.13.jar:6.0.13]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.0.13.jar:6.0.13]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.0.13.jar:6.0.13]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:75) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:277) ~[spring-rabbit-3.0.10.jar:3.0.10]
... 21 common frames omitted
I think this is because my RabbitListenerContainerFactory<StreamListenerContainer> invoiceContainerFactory from configuration above does not inject any MessageConverter (should be Jackson2JsonMessageConverter, since the listenDefault works). But I can't find any method on the StreamRabbitListenerContainerFactory to inject the message converter. Does RabbitListenerContainerFactory<StreamListenerContainer> does not support JSON conversion?