Java 8 CompletableFuture , Stream and Timeouts

8.3k views Asked by At

i'm trying to process some amount of data concurrently using CompletableFuture and Stream So far i have:

public static void main(String[] args) throws InterruptedException, ExecutionException {

    List<String> collect = Stream.of("1", "2", "3", "4", "5",
            "6", "7")
            .map(x -> CompletableFuture.supplyAsync(getStringSupplier(x)))
    System.out.println("stop out!");

public static Supplier<String> getStringSupplier(String text) {
    return () -> {

        System.out.println("start " + text);
        try {
        } catch (InterruptedException e) {
        System.out.println("stop " + text);
        return "asd" + text;

And output is fine :

start start 1 start 4 start 3 start 2 start 5 start 6 start 7 stop 4 stop 1 stop 5 stop 2 stop 6 stop 3 stop 7 stop out!

However right now i want to add timeout to that job. Lets say it should be canceled after 1 SECOND. And return null or some other value to collect list. (I would prefer some value indicating cause).

How can i achieve that ?

Thanks for help in advance.


There are 4 answers

user2377971 On BEST ANSWER

I have found the way of doing that:

 private static final ScheduledExecutorService scheduler =
                new ThreadFactoryBuilder()

public static void main(String[] args) throws InterruptedException, ExecutionException {
    final CompletableFuture<Object> oneSecondTimeout = failAfter(Duration.ofSeconds(1))
            .exceptionally(xxx -> "timeout exception");
    List<Object> collect = Stream.of("1", "2", "3", "4", "5", "6", "7")
            .map(x -> CompletableFuture.anyOf(createTaskSupplier(x)
                    , oneSecondTimeout))
    System.out.println("stop out!");

public static CompletableFuture<String> createTaskSupplier(String x) {
    return CompletableFuture.supplyAsync(getStringSupplier(x))
            .exceptionally(xx -> "PROCESSING ERROR : " + xx.getMessage());

public static Supplier<String> getStringSupplier(String text) {
    return () -> {

        System.out.println("start " + text);
        try {
        } catch (InterruptedException e) {
        if (text.equals("1")) {
            throw new RuntimeException("LOGIC ERROR");
        try {
            if (text.equals("7"))
        } catch (InterruptedException e) {
        System.out.println("stop " + text);
        return "result " + text;

public static <T> CompletableFuture<T> failAfter(Duration duration) {
    final CompletableFuture<T> promise = new CompletableFuture<>();
    scheduler.schedule(() -> {
        final TimeoutException ex = new TimeoutException("Timeout after " + duration);
        return promise.completeExceptionally(ex);
    }, duration.toMillis(), MILLISECONDS);
    return promise;

It returns :

 start 1
 start 3
 start 4
 start 2
 start 5
 start 6
 start 7
 stop 6
 stop 4
 stop 3
 stop 5
 stop 2
 stop out!
 [PROCESSING ERROR : java.lang.RuntimeException: LOGIC ERROR, result 2, result 3, result 4, result 5, result 6, timeout exception]`

What do you think about that, can you spot any flaws of that solution ?

Rohit Gulati On

you can try CompletableFuture's overloaded supplyAsync method with executor parameter (CompletableFuture.supplyAsync(getStringSupplier(x), timeoutExecutorService)) and can refer link for timeoutExecutorService.

Nitin Puri On

You can wrap the job in another CompletableFuture and it would give out a TimeoutException if the given time is exceeded. You can separate the TimeoutException catch block if you want to handle it specially.

    List<String> collect = null;
    try {
        collect = CompletableFuture.supplyAsync(() ->
                Stream.of("1", "2", "3", "4", "5",
                        "6", "7")
                        .map(x -> CompletableFuture.supplyAsync(getStringSupplier(x)))
        ).get(5, TimeUnit.SECONDS);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        //separate out the TimeoutException if you want to handle it differently 


    System.out.println(collect); //would be null in case of any exception
ytterrr On

For others, who are not limited with Java 8, you can use completeOnTimeout method, which was introduced in Java 9.

List<String> collect = Stream.of("1", "2", "3", "4", "5", "6", "7")
        .map(x -> CompletableFuture.supplyAsync(getStringSupplier(x))
                .completeOnTimeout(null , 1, SECONDS))