Project Reactor timeout handling

22.2k views Asked by At

I have three questions related to Project Reactor and I will ask them below. Start with the code that I have (it will be simplified to easier understand the issue).

Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) {
  return Mono.just("hello")
        .compose(monostr -> monostr
            .doOnSuccess(str -> System.out.println("Suppose I want to release session here after all")) //(1)
            .doOnCancel(() -> System.out.println("cancelled")) //(2)
            .then(callback::apply)
            .timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
        );
}

And test:

@Test
public void testDoWithSession2() throws Exception {
  Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> {
    System.out.println("do some long timed work");
    try {
      Thread.sleep(5000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("work has completed");
    return str.length();
  });

  StepVerifier.create(doWithSession(fun1,1000))
    .verifyError(TimeoutException.class);
}

So and questions:

  1. how to interrupt invocation of fun1 and immediately return error? (maybe I'm doing something wrong but it looks error returns not after timeout but after all invocation of callback)
  2. why doOnSuccess and doOnCancel invoked at the same time? (I expected that (1) OR (2) will be invoked but not the both)
  3. And how to deal with the following case:
    • imagine that in code Mono.just("hello") is acquiring connection;
    • in callback I'm doing something with connection and getting some result (Mono<Integer> in my case);
    • at the end (on success or on failure) I want to release session (I try to do this in (1)).
2

There are 2 answers

0
Simon Baslé On BEST ANSWER

1) As you found out, use .publishOn(Schedulers.single()). This will ensure the callable is invoked on another thread and only blocks said thread. Plus it will allow the callable to be cancelled.

2) The order of your chain matters. You put .doOnSuccess at the beginning of the compose (which you don't really need for that particular example by the way, unless you want to extract that compose function for reuse later). So it means that it gets notifications from the Mono.just basically, and runs as soon as the source is queried, even before your processing has taken place... Same for doOnCancel. The cancellation comes from the timeout triggering...

3) There's an factory to create a sequence out of a resource and ensure that resource is cleaned up: Mono.using. So it would look something like that:

public <T> Mono<T> doWithConnection(Function<String, Mono<T>> callback, long timeout) {
    return Mono.using(
            //the resource supplier:
            () -> {
                System.out.println("connection acquired");
                return "hello";
            },
            //create a Mono out of the resource. On any termination, the resource is cleaned up
            connection -> Mono.just(connection)
                              //the blocking callable needs own thread:
                              .publishOn(Schedulers.single())
                              //execute the callable and get result...
                              .then(callback::apply)
                              //...but cancel if it takes too long
                              .timeoutMillis(timeout)
                              //for demonstration we'll log when timeout triggers:
                              .doOnError(TimeoutException.class, e -> System.out.println("timed out")),
            //the resource cleanup:
            connection -> System.out.println("cleaned up " + connection));
}

That returns a Mono<T> of the callable's T value. In production code, you'd subscribe to it to deal with the value. In test, StepVerifier.create() will subscribe for you.

Let's demonstrate that with your long running task and see what it outputs:

@Test
public void testDoWithSession2() throws Exception {
    Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> {
        System.out.println("start some long timed work");
        //for demonstration we'll print some clock ticks
        for (int i = 1; i <= 5; i++) {
            try {
                Thread.sleep(1000);
                System.out.println(i + "s...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("work has completed");
        return str.length();
    });

    //let two ticks show up
    StepVerifier.create(doWithConnection(fun1,2100))
                .verifyError(TimeoutException.class);
}

This outputs:

connection acquired
start some long timed work
1s...
2s...
timed out
cleaned up hello

And if we put the timeout over 5000, we get the following. (there' an assertion error because the StepVerifier expects a timeout):

connection acquired
start some long timed work
1s...
2s...
3s...
4s...
5s...
work has completed
cleaned up hello

java.lang.AssertionError: expectation "expectError(Class)" failed (expected: onError(TimeoutException); actual: onNext(5)
0
Andrii Pischanski On

For the 1st question looks like the answer is to use schedulers:

Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) {
    Scheduler single = Schedulers.single();
    return Mono.just("hello")
            .compose(monostr -> monostr
                    .publishOn(single) // use scheduler
                    .then(callback::apply)
                    .timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
            );
}

3rd question could be solved this way:

private Mono<Integer> doWithSession3(Function<String, Mono<Integer>> callback, long timeout) {
    Scheduler single = Schedulers.single();
    return Mono.just("hello")
            .then(str -> Mono.just(str) // here wrapping our string to new Mono
                    .publishOn(single)
                    .then(callback::apply)
                    .timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
                    .doAfterTerminate((res, throwable) -> System.out.println("Do anything with your string" + str))
            );
}