Using the Micronaut RabbitMQ with a direct message exchange mechanism, while returning the value from the listener to the producer the JSON parser error appears as described below
08:37:58.637 [main] INFO io.micronaut.runtime.Micronaut - Startup completed in 8231ms. Server Running: http://localhost:8080
08:38:05.170 [pool-2-thread-4] ERROR i.m.h.n.stream.HttpStreamsHandler - Error occurred writing stream response: Error decoding JSON stream for type [T]: Unexpected token (START_OBJECT), expected END_ARRAY: Attempted to unwrap 'com.example.domain.ProductViewModel' value from an array (with `DeserializationFeature.UNWRAP_SINGLE_VALUE_ARRAYS`) but it contains more than one value
at [Source: (byte[])"[{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"na"[truncated 125 bytes]; line: 1, column: 80]
io.micronaut.core.serialize.exceptions.SerializationException: Error decoding JSON stream for type [T]: Unexpected token (START_OBJECT), expected END_ARRAY: Attempted to unwrap 'com.example.domain.ProductViewModel' value from an array (with `DeserializationFeature.UNWRAP_SINGLE_VALUE_ARRAYS`) but it contains more than one value
at [Source: (byte[])"[{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"na"[truncated 125 bytes]; line: 1, column: 80]
at io.micronaut.rabbitmq.serdes.JsonRabbitMessageSerDes.deserialize(JsonRabbitMessageSerDes.java:74)
at io.micronaut.rabbitmq.intercept.RabbitMQIntroductionAdvice.deserialize(RabbitMQIntroductionAdvice.java:323)
at io.micronaut.rabbitmq.intercept.RabbitMQIntroductionAdvice.lambda$intercept$22(RabbitMQIntroductionAdvice.java:268)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:132)
at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onNext(RxInstrumentedSubscriber.java:59)
at io.reactivex.internal.operators.flowable.FlowableTimeoutTimed$TimeoutSubscriber.onNext(FlowableTimeoutTimed.java:101)
at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onNext(RxInstrumentedSubscriber.java:59)
at io.reactivex.internal.subscriptions.DeferredScalarSubscription.complete(DeferredScalarSubscription.java:132)
at io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.onSuccess(SingleToFlowable.java:62)
at io.micronaut.reactive.rxjava2.RxInstrumentedSingleObserver.onSuccess(RxInstrumentedSingleObserver.java:65)
at io.reactivex.internal.operators.single.SingleFlatMap$SingleFlatMapCallback$FlatMapSingleObserver.onSuccess(SingleFlatMap.java:111)
at io.micronaut.reactive.rxjava2.RxInstrumentedSingleObserver.onSuccess(RxInstrumentedSingleObserver.java:65)
at io.reactivex.internal.operators.single.SingleDoFinally$DoFinallyObserver.onSuccess(SingleDoFinally.java:73)
at io.micronaut.reactive.rxjava2.RxInstrumentedSingleObserver.onSuccess(RxInstrumentedSingleObserver.java:65)
at io.reactivex.internal.operators.single.SingleCreate$Emitter.onSuccess(SingleCreate.java:67)
at io.micronaut.rabbitmq.reactive.RxJavaReactivePublisher$3.handleDelivery(RxJavaReactivePublisher.java:324)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
Listener
@RabbitListener
public class ProductListener {
@Queue(ProductTopicConstants.GET_PRODUCTS)
public List<ProductViewModel> find(String text) {
List<ProductViewModel> value = new ArrayList<>();
value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
return value;
}
}
Producer
@RabbitClient(ProductTopicConstants.FETE_BIRD_EXCHANGE)
@RabbitProperty(name = "replyTo", value = "amq.rabbitmq.reply-to")
public interface IProductProducer {
@Binding(ProductTopicConstants.GET_PRODUCTS)
Flowable<ProductViewModel> find(String text);
}
Controller
@Get("/{text}")
public Flowable<ProductViewModel> Find(String text) {
return iproductProducer.find(text);
}
For the single list of item the exception doesn't appears, however, for more then 1 value the exception appears. Since I am using Flowable it should act like a LIST