Exception when no consumers set in Reactive messaging in Quarkus

363 views Asked by At

I followed the official Quarkus messaging guide and create a simple example to taste the Reactive messaging feature using AMQP(Apache Artemis).

The complete code is here.

The example is working but a small issue there, I have to start a curl to consume the message firstly, then use another curl to send messages.

// start consumer side.
curl http://localhost:8080/messages -H "Accept:text/event-stream"

// start sending.
curl http://localhost:8080/messages -d "Hello, Quarkus" -H "Content-Type:text/plain"

// then the consumer exit.
// and sending a message will cause an exception.


If cancel the consumer, and then sending messages, there is an exception thrown.

2020-10-12 20:18:54,137 WARN  [io.net.cha.AbstractChannelHandlerContext] (vert.x-eventloop-thread-6) Failed to mark a promise as failure because it has failed already: DefaultChannelPromise@4bda4836(failure: java.nio.channels.ClosedChannelException), unnotified cause: java.nio.channels.ClosedChannelException
    at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
    at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
    at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite$2(ConnectionBase.java:215)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:832)
: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
    at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
    at io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:92)
    at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:867)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
    at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
    at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite$2(ConnectionBase.java:215)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:832)

If it is possible to cache the messages in Apache Artemis even there is no consumer connected.

1

There are 1 answers

0
Domenico Francesco Bruscino On

Apache ActiveMQ Artemis has a powerful and flexible addressing model. If your scenario is a Point-to-point messaging you could define your address customizing the broker configuration, ie:

<address name="messages">
    <anycast>
        <queue name="messages" />
    </anycast>
</address>

The broker will create at startup all addresses and queues defined in the configuration, so when the sender will send a message it will be routed to an existent queue.