Is there a way to put a CompletableFuture in a loop?

3.3k views Asked by At

The problem with the code below is that I have to wait for all three tasks to finish.

If the 1st and 2nd tasks complete in 200ms and the 3rd completes in 2s then I will have to wait 2s before I load the next three URLs.

Ideally I would send a new request as soon as each task finishes and delay the main thread somehow until the ArrayList was empty.

In simple terms I would like each completable future to run in a kind of loop that is triggered by the old task completing.

(I do this quite often in JavaScript using events)

Can anybody think how I might achieve this?

    private static void httpClientExample(){

    ArrayList<String> urls = new ArrayList<>(
            Arrays.asList(
                    "https://www.bing.com/",
                    "https://openjdk.java.net/",
                    "https://openjdk.java.net/",
                    "https://google.com/",
                    "https://github.com/",
                    "https://stackoverflow.com/"
            ));

    HttpClient httpClient = HttpClient.newHttpClient();

    var task1 = httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(urls.get(0)))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println);

    var task2 = httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(urls.get(1)))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println);

    var task3 = httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(urls.get(2)))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println);

    // All tasks have to complete
    var all = CompletableFuture.allOf(task1, task2, task3).join();
    
    // Get the next 3 URLs

    System.out.println("Main Thread Completed");
}
2

There are 2 answers

3
Holger On BEST ANSWER

Letting the job itself remove another pending URL and submit it, would require a thread safe queue.

It might be easier to let the main thread do it, e.g. like

var httpClient = HttpClient.newHttpClient();
var pending = new ArrayDeque<CompletableFuture<?>>(3);
for(String url: urls) {
    while(pending.size() >= 3 && !pending.removeIf(CompletableFuture::isDone))
        CompletableFuture.anyOf(pending.toArray(CompletableFuture<?>[]::new)).join();

    pending.addLast(httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(url))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println));
}
CompletableFuture.allOf(pending.toArray(CompletableFuture<?>[]::new)).join();

This will wait until at least one of the three submitted jobs has completed (using anyOf/join) before submitting the next one. When the loop ends, there might be up to three still running jobs. The subsequent allOf/join after the loop will wait for the completion of those jobs, so all jobs have been completed afterwards. When you want the initiator thread to proceed when it is known that all jobs have been submitted, without waiting for their completion, just remove the last statement.

12
dpr On

If you don't have a requirement on the maximum amount of parallel calls things become a lot easier:

private static void httpClientExample() throws Exception {

  final ArrayList<String> urls = ...; //list of urls 

  final HttpClient httpClient = HttpClient.newBuilder().executor(
                                    Executors.newFixedThreadPool(10)).build();

  final List<CompletableFuture<Void>> allFutures = new ArrayList<>();
  for (String url : urls) {
    final CompletableFuture<Void> completableFuture = httpClient
        .sendAsync(HttpRequest.newBuilder().uri(URI.create(url)).build(),
            HttpResponse.BodyHandlers.ofString())
        .thenApply(HttpResponse::uri).thenAccept(System.out::println);
    allFutures.add(completableFuture);
  }

  CompletableFuture.allOf(allFutures.toArray(CompletableFuture[]::new)).get();
}