In which thread do CompletableFuture's completion handlers execute?

16.6k views Asked by At

I have a question about CompletableFuture method:

public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)

The thing is the JavaDoc says just this:

Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function. See the CompletionStage documentation for rules covering exceptional completion.

What about threading? In which thread is this going to be executed? What if the future is completed by a thread pool?

5

There are 5 answers

1
Naman On BEST ANSWER

The policies as specified in the CompletableFuture docs could help you understand better:

  • Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

  • All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task). To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interface CompletableFuture.AsynchronousCompletionTask.

Update: I would also advice on reading this answer by @Mike as an interesting analysis further into the details of the documentation.

2
John Kugelman On

When it comes to threading the API documentation is lacking. It takes a bit of inference to understand how threading and futures work. Start with one assumption: the non-Async methods of CompletableFuture do not spawn new threads on their own. Work will proceed under existing threads.

thenApply will run in the original CompletableFuture's thread. That's either the thread that calls complete(), or the one that calls thenApply() if the future is already completed. If you want control over the thread—a good idea if fn is a slow operation—then you should use thenApplyAsync.

0
NPE On

From the Javadoc:

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

More concretely:

  • fn will run during the call to complete() in the context of whichever thread has called complete().

  • If complete() has already finished by the time thenApply() is called, fn will be run in the context of the thread calling thenApply().

11
Mike Strobel On

As @nullpointer points out, the documentation tells you what you need to know. However, the relevant text is surprisingly vague, and some of the comments (and answers) posted here seem to rely on assumptions that aren't supported by the documentation. Thus, I think it's worthwhile to pick it apart. Specifically, we should read this paragraph very carefully:

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

Sounds straightforward enough, but it's light on details. It seemingly deliberately avoids describing when a dependent completion may be invoked on the completing thread versus during a call to a completion method like thenApply. As written, the paragraph above is practically begging us to fill in the gaps with assumptions. That's dangerous, especially when the topic concerns concurrent and asynchronous programming, where many of the expectations we've developed as programmers get turned on their head. Let's take a careful look at what the documentation doesn't say.

The documentation does not claim that dependent completions registered before a call to complete() will run on the completing thread. Moreover, while it states that a dependent completion might be invoked when calling a completion method like thenApply, it does not state that a completion will be invoked on the thread that registers it (note the words "any other").

These are potentially important points for anyone using CompletableFuture to schedule and compose tasks. Consider this sequence of events:

  1. Thread A registers a dependent completion via f.thenApply(c1).
  2. Some time later, Thread B calls f.complete().
  3. Around the same time, Thread C registers another dependent completion via f.thenApply(c2).

Conceptually, complete() does two things: it publishes the result of the future, and then it attempts to invoke dependent completions. Now, what happens if Thread C runs after the result value is posted, but before Thread B gets around to invoking c1? Depending on the implementation, Thread C may see that f has completed, and it may then invoke c1 and c2. Alternatively, Thread C may invoke c2 while leaving Thread B to invoke c1. The documentation does not rule out either possibility. With that in mind, here are assumptions that are not supported by the documentation:

  1. That a dependent completion c registered on f prior to completion will be invoked during the call to f.complete();
  2. That c will have run to completion by the time f.complete() returns;
  3. That dependent completions will be invoked in any particular order (e.g., order of registration);
  4. That dependent completions registered before f completes will be invoked before completions registered after f completes.

Consider another example:

  1. Thread A calls f.complete();
  2. Some time later, Thread B registers a completion via f.thenApply(c1);
  3. Around the same time, Thread C registers a separate completion via f.thenApply(c2).

If it is known that f has already run to completion, one might be tempted to assume that c1 will be invoked during f.thenApply(c1) and that c2 will be invoked during f.thenApply(c2). One might further assume that c1 will have run to completion by the time f.thenApply(c1) returns. However, the documentation does not support these assumptions. It may be possible that one of the threads calling thenApply ends up invoking both c1 and c2, while the other thread invokes neither.

A careful analysis of the JDK code could determine how the hypothetical scenarios above might play out. But even that is risky, because you may end up relying on an implementation detail that is (1) not portable, or (2) subject to change. Your best bet is not to assume anything that's not spelled out in the javadocs or the original JSR spec.

tldr: Be careful what you assume, and when you write documentation, be as clear and deliberate as possible. While brevity is a wonderful thing, be wary of the human tendency to fill in the gaps.

0
ALargeTom On

I know this question is old, but I want to use source code to explain this question.

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}

private CompletableFuture<Void> uniAcceptStage(Executor e,
                                               Consumer<? super T> f) {
    if (f == null) throw new NullPointerException();
    Object r;
    if ((r = result) != null)
        return uniAcceptNow(r, e, f);
    CompletableFuture<Void> d = newIncompleteFuture();
    unipush(new UniAccept<T>(e, d, this, f));
    return d;
}

This is the source code from java 16, and we can see, if we trigger thenAccept, we will pass a null executor service reference into our function. From the 2nd function uniAcceptStage() 2nd if condition. If result is not null, it will trigger uniAcceptNow()

if (e != null) {
     e.execute(new UniAccept<T>(null, d, this, f));
} else {
     @SuppressWarnings("unchecked") T t = (T) r;
     f.accept(t);
     d.result = NIL;
}

if executor service is null, we will use lambda function f.accept(t) to execute it. If we are triggering this thenApply/thenAccept from main thread, it will use main thread as executing thread.

But if we cannot get previous result from last completablefuture, we will push our current UniAccept/Apply into stack by using uniPush function. And UniAccept class has tryFire() which will be triggered from our postComplete() function

final void postComplete() {
    /*
     * On each step, variable f holds current dependents to pop
     * and run.  It is extended along only one path at a time,
     * pushing others to avoid unbounded recursion.
     */
    CompletableFuture<?> f = this; Completion h;
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        if (STACK.compareAndSet(f, h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                NEXT.compareAndSet(h, t, null); // try to detach
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}