I'm trying to send lots of messages over the Vertx event bus like this (Clustered with Hazelcast) without blocking:
EventBus eb = vertx.eventBus();
for (int i = 0; i < 100; i++) {
vertx.setPeriodic(1, num -> {
eb.send("clusteredEndpoint", "ping");
});
}
When the number of timers is smaller it works fine but at around 100 timers I get this error.
I'm wondering how to scale to 100K events/s without blocking (for reference I wrote a Vertx WebSocket test that could exceed this number).
If it's not possible I'd like to understand what's blocking - looks like it's something in this class: https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java
For references - this code does not block - even with 1000 timers:
HttpClient client = vertx.createHttpClient();
client.webSocket(8080, "localhost", "/", res -> {
for (int i = 0; i < 1000; i++) {
vertx.setPeriodic(1, num -> {
res.result().writeTextMessage("ping");
});
}
});
});
Dec 15, 2020 10:54:38 AM io.vertx.core.impl.BlockedThreadChecker WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 36794 ms, time limit is 2000 ms io.vertx.core.VertxException: Thread blocked at io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:140) at io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23) at io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java:133) at io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23) at io.vertx.core.spi.cluster.impl.selector.Selectors.withSelector(Selectors.java:48) at io.vertx.core.spi.cluster.impl.DefaultNodeSelector.selectForSend(DefaultNodeSelector.java:42) at io.vertx.core.eventbus.impl.clustered.ClusteredEventBus$$Lambda$1065/195695453.accept(Unknown Source) at io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue$SerializedTask.process(Serializer.java:147) at io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue.checkPending(Serializer.java:94) at io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue.add(Serializer.java:114) at io.vertx.core.eventbus.impl.clustered.Serializer.queue(Serializer.java:65) at io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendOrPub(ClusteredEventBus.java:172) at io.vertx.core.eventbus.impl.OutboundDeliveryContext.next(OutboundDeliveryContext.java:127) at io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java:394) at io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java:400) at io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:103) at io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:97) at io.vertx.example.EBtestClient.lambda$start$0(EBtestClient.java:22) at io.vertx.example.EBtestClient$$Lambda$1056/1487417027.handle(Unknown Source) at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:939) at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:910) at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:52) at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:294) at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:24) at io.vertx.core.impl.AbstractContext.emit(AbstractContext.java:49) at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:24) at io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:933) at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)
Here's my analysis after further investigation:
When using the Vertx event bus for remote communication once the consumer gets overwhelmed it stops responding. This causes the producer to block and I've captured 3 different blocking messages (see below). After the blocking warning there's this warning:
The answer to my question is that it doesn't matter "why" it's blocking because it's dead (because it's hit a certain limit).
I'm surprised that Vert.x doesn't handle this more gracefully - like maybe throwing an exception.
Blocking error #1
Thread blocked at io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:140) at io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23) at io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java:133) at io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23) at io.vertx.core.spi.cluster.impl.selector.Selectors.withSelector(Selectors.java:48) at
Blocking error #2
io.vertx.core.VertxException: Thread blocked at java.nio.charset.CharsetEncoder.(CharsetEncoder.java:198) at java.nio.charset.CharsetEncoder.(CharsetEncoder.java:233) at sun.nio.cs.UTF_8$Encoder.(UTF_8.java:558) at sun.nio.cs.UTF_8$Encoder.(UTF_8.java:554) at sun.nio.cs.UTF_8.newEncoder(UTF_8.java:72)
Blocking error #3
io.vertx.core.VertxException: Thread blocked at io.vertx.core.eventbus.impl.clustered.ConnectionHolder.writeMessage(ConnectionHolder.java:93) at io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendRemote(ClusteredEventBus.java:332) at io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendToNode(ClusteredEventBus.java:283)