How to enforce blocking for a vertx Redis stream write?

236 views Asked by At

I have two microservices. One microservice is responsible for writing to a redis stream and the other is reading from the redis stream. For some reasons outside of my control, I have to use the Redis Vertx client in order to write to the stream. The problem with this is that the microservice that is responsible for writing to redis is blocking up until the point where it does the write to redis. This causes some unexpected behavior because of course the vertx redis code is async and can occur at any time. For instance please see below.

Microservice B

public void doSomething(){
   //stuff being done
   ...
   writeToRedis(message);
   logger.infof("finished doing stuff");
}

public void writeToRedis(String message){
  final Redis client = Redis.createClient(vertx, connectionString);

  try{
     client.connect()
     .onSuccess(conn -> 
       conn.send(
          Request.cmd(Command.XADD).arg(stream).arg("*").arg("payload").arg(message);
       )
       .onSuccess(
          response -> 
            logger.infof("Message successfully written");
            conn.close();
            client.close();
       ).onFailure(e->logger.error("something bad happened",e)))
  } catch(InterruptedException | ExecutionException e){
     if(Thread.interrupted()){
        Thread.currentThread().interrupt();
        throw new CustomException(e);
     } else{
        throw new CustomException(e);
     }
  }
}

From the above the finished doing stuff log message can be written before the message is written. I don't want this to happen. I know vertx does not want you to block the event loop but in this case I need to block until the message is written. Not sure how to get around this.

2

There are 2 answers

0
Oliver Marienfeld On BEST ANSWER

There is no need to block the event loop. Instead of void, the writeToRedis method should return e.g. Future<Void>:

public Future<Void> writeToRedis(String message) {
    final Redis client = Redis.createClient(vertx, connectionString);

    return client.connect()
        .onSuccess(conn -> 
            conn.send(Request.cmd(Command.XADD).arg(stream).arg("*").arg("payload").arg(message))
                .onSuccess(response -> {
                    logger.infof ("Message successfully written");
                    conn.close();
                    client.close();})
        .onFailure(e -> logger.error("something bad happened",e));
}

Then, you can make doSomething() act on that Future‘s completion:

public void doSomething(){
    //stuff being done
    ...
    writeToRedis(message).onSuccess(v ->
        logger.infof("finished doing stuff"));
}

That’s it, no need to block! By the way, you should not need the exception-handling code in the writeToRedis method.

1
Trevor On

After some research and reaching out to the Quarkus team the suggested solution was to :

  1. use the mutiny variant and use .await().atMost(…)

  2. Or use toCompletionStage().toCompletableFuture().join()