Vertx event bus slow consuming issue

515 views Asked by At

We have a non clustered vertx application, and we use the event bus to internally communicate between verticles.

  • Verticle A consumes from the bus, performs a HTTP request, and sends the response back through the bus.

  • Verticle B just request to perform that HTTP request.

The problem appears when a "high" request volume is performed by Verticle B. Then, the consumer starts receiving the events slower and slower (presumably because they are getting queued in the event bus). For 8 requests/second the bus takes up to 3-4 seconds to consume the event. When the requests/second are elevated, it can take more than 30 seconds to consume it, so the bus timeout is triggered.

The thing is, Verticle A is really fast performing the HTTP operation (~200ms) so I don't really understand why the requests get stuck in the bus.

We've tried many solutions but none ot then worked:

  • Deploy multiple instances of Verticle A as workers
  • Use vertx.executeBlocking() to perform the HTTP request

The only thing that worked was commenting the HTTP request and returning a mock object through the bus. But again, the HTTP request doesn't take more than 200ms, so it shouldn't be blocking the bus.

Additional information: We use an autogenerated rest client that uses Retrofit + OkHttpClient. Due to company policy, we cannot use Vertx WebClient, so I didn't try this solution.

EXAMPLE

This is a really simplified version of our code so you can check if I'm missing something.

VERTICLE A

// Instantiated in Verticle A
public class EmailSender {

    private final Vertx vertx;
    private final EmailApiClient emailApiClient;

    public EmailSender(Vertx vertx) {
        this.vertx = vertx;
        emailApiClient = ClientFactory.createEmailApiClient();
    }

    public void start() {
        vertx.eventBus().consumer("sendEmail", this::sendEmail);
    }

    public void sendEmail(Message<EmailRequest> message) {
        EmailRequest emailRequest = message.body();
        emailApiClient.sendEmail(emailRequest).subscribe(
            response -> {
                if (response.code() == 200) {
                    EmailResponse emailResponse = response.body();
                    message.reply(emailResponse);

                } else {
                    message.fail(500, "Error sending email");
                }
            });
    }
}

VERTICLE B

// Instantiated in Verticle B
public class EmailCommunications {

    private final Vertx vertx;

    public EmailCommunications(Vertx vertx) {
        this.vertx = vertx;
    }

    public Single<EmailResponse> sendEmail(EmailRequest emailRequest) {
        SingleSubject<EmailResponse> emailSent = SingleSubject.create();
        vertx.eventBus().request(
            "sendEmail",
            emailRequest,
            busResult -> {
                if (busResult.succeded()) {
                    emailSent.onSuccess(busResult.result().body())
                } else {
                    emailSent.onError(busResult.cause())
                }
            }
        );
        return emailSent;
    }
}
1

There are 1 answers

0
Diego Manuel Mateos Gómez On

We fixed the issue changing our OkHttpClient configuration so HTTP requests won't get stuck


default void configureOkHttpClient(OkHttpClient.Builder okHttpClientBuilder) {
        ConnectionPool connectionPool = new ConnectionPool(40, 5, TimeUnit.MINUTES);
        Dispatcher dispatcher = new Dispatcher();
        dispatcher.setMaxRequestsPerHost(200);
        dispatcher.setMaxRequests(200);

        okHttpClientBuilder
            .readTimeout(60, TimeUnit.SECONDS)
            .retryOnConnectionFailure(true)
            .connectionPool(connectionPool)
            .dispatcher(dispatcher);
    }