I have three questions related to Project Reactor and I will ask them below. Start with the code that I have (it will be simplified to easier understand the issue).
Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) {
return Mono.just("hello")
.compose(monostr -> monostr
.doOnSuccess(str -> System.out.println("Suppose I want to release session here after all")) //(1)
.doOnCancel(() -> System.out.println("cancelled")) //(2)
.then(callback::apply)
.timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
);
}
And test:
@Test
public void testDoWithSession2() throws Exception {
Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> {
System.out.println("do some long timed work");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("work has completed");
return str.length();
});
StepVerifier.create(doWithSession(fun1,1000))
.verifyError(TimeoutException.class);
}
So and questions:
- how to interrupt invocation of
fun1
and immediately return error? (maybe I'm doing something wrong but it looks error returns not after timeout but after all invocation of callback) - why
doOnSuccess
anddoOnCancel
invoked at the same time? (I expected that (1) OR (2) will be invoked but not the both) - And how to deal with the following case:
- imagine that in code
Mono.just("hello")
is acquiring connection; - in
callback
I'm doing something with connection and getting some result (Mono<Integer>
in my case); - at the end (on success or on failure) I want to release session (I try to do this in (1)).
- imagine that in code
1) As you found out, use
.publishOn(Schedulers.single())
. This will ensure the callable is invoked on another thread and only blocks said thread. Plus it will allow the callable to be cancelled.2) The order of your chain matters. You put
.doOnSuccess
at the beginning of thecompose
(which you don't really need for that particular example by the way, unless you want to extract that compose function for reuse later). So it means that it gets notifications from theMono.just
basically, and runs as soon as the source is queried, even before your processing has taken place... Same fordoOnCancel
. The cancellation comes from thetimeout
triggering...3) There's an factory to create a sequence out of a resource and ensure that resource is cleaned up:
Mono.using
. So it would look something like that:That returns a
Mono<T>
of the callable's T value. In production code, you'd subscribe to it to deal with the value. In test,StepVerifier.create()
will subscribe for you.Let's demonstrate that with your long running task and see what it outputs:
This outputs:
And if we put the timeout over 5000, we get the following. (there' an assertion error because the StepVerifier expects a timeout):