Why does Vertx Event Bus block under high load?

1.6k views Asked by At

I'm trying to send lots of messages over the Vertx event bus like this (Clustered with Hazelcast) without blocking:

EventBus eb = vertx.eventBus();

for (int i = 0; i < 100; i++) {
  vertx.setPeriodic(1, num -> {
    eb.send("clusteredEndpoint", "ping");
  });
}

When the number of timers is smaller it works fine but at around 100 timers I get this error.

I'm wondering how to scale to 100K events/s without blocking (for reference I wrote a Vertx WebSocket test that could exceed this number).

If it's not possible I'd like to understand what's blocking - looks like it's something in this class: https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java

For references - this code does not block - even with 1000 timers:

HttpClient client = vertx.createHttpClient();
client.webSocket(8080, "localhost", "/", res -> {
  for (int i = 0; i < 1000; i++) {
    vertx.setPeriodic(1, num -> {
      res.result().writeTextMessage("ping");
    });
  }
});
});

Dec 15, 2020 10:54:38 AM io.vertx.core.impl.BlockedThreadChecker WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 36794 ms, time limit is 2000 ms io.vertx.core.VertxException: Thread blocked at io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:140) at io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23) at io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java:133) at io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23) at io.vertx.core.spi.cluster.impl.selector.Selectors.withSelector(Selectors.java:48) at io.vertx.core.spi.cluster.impl.DefaultNodeSelector.selectForSend(DefaultNodeSelector.java:42) at io.vertx.core.eventbus.impl.clustered.ClusteredEventBus$$Lambda$1065/195695453.accept(Unknown Source) at io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue$SerializedTask.process(Serializer.java:147) at io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue.checkPending(Serializer.java:94) at io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue.add(Serializer.java:114) at io.vertx.core.eventbus.impl.clustered.Serializer.queue(Serializer.java:65) at io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendOrPub(ClusteredEventBus.java:172) at io.vertx.core.eventbus.impl.OutboundDeliveryContext.next(OutboundDeliveryContext.java:127) at io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java:394) at io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java:400) at io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:103) at io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:97) at io.vertx.example.EBtestClient.lambda$start$0(EBtestClient.java:22) at io.vertx.example.EBtestClient$$Lambda$1056/1487417027.handle(Unknown Source) at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:939) at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:910) at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:52) at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:294) at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:24) at io.vertx.core.impl.AbstractContext.emit(AbstractContext.java:49) at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:24) at io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:933) at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)

2

There are 2 answers

0
Charlie On BEST ANSWER

Here's my analysis after further investigation:

When using the Vertx event bus for remote communication once the consumer gets overwhelmed it stops responding. This causes the producer to block and I've captured 3 different blocking messages (see below). After the blocking warning there's this warning:

WARNING: No pong from server 2d1fb2ce-940f-4b60-bf60-39847f31bcaf - will consider it dead

The answer to my question is that it doesn't matter "why" it's blocking because it's dead (because it's hit a certain limit).

I'm surprised that Vert.x doesn't handle this more gracefully - like maybe throwing an exception.

Blocking error #1

Thread blocked at io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:140) at io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23) at io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java:133) at io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23) at io.vertx.core.spi.cluster.impl.selector.Selectors.withSelector(Selectors.java:48) at

Blocking error #2

io.vertx.core.VertxException: Thread blocked at java.nio.charset.CharsetEncoder.(CharsetEncoder.java:198) at java.nio.charset.CharsetEncoder.(CharsetEncoder.java:233) at sun.nio.cs.UTF_8$Encoder.(UTF_8.java:558) at sun.nio.cs.UTF_8$Encoder.(UTF_8.java:554) at sun.nio.cs.UTF_8.newEncoder(UTF_8.java:72)

Blocking error #3

io.vertx.core.VertxException: Thread blocked at io.vertx.core.eventbus.impl.clustered.ConnectionHolder.writeMessage(ConnectionHolder.java:93) at io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendRemote(ClusteredEventBus.java:332) at io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendToNode(ClusteredEventBus.java:283)

5
Alexey Soshin On

First of all, you'll be running 100 tasks on the same thread, because Vert.x has thread affinity. If you want to avoid that, run them on separate verticles. But still, I don't think you have 100 CPUs, so there will be a lot of contention.

And setting all of them to execute every 1ms means that they somehow need to finish in 10 microseconds each, which includes networking code, because you're using clustered EventBus.

So, it's how the test is written, not what Vert.x is doing.

If you really want to test this kind of load (we're talking 100K rps here), spread your requests across multiple machines.

But event then, I'm not sure that Hazelcast is build to handle that kind of load.

If you would like to know what really blocks, my guess is this part of code:

https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/spi/cluster/impl/DefaultNodeSelector.java#L43

As I don't have a clustered Vert.x readily setup, I cannot confirm if my assumption is correct, though.