First Object in Set<Future<Object>> that satisfies a predicate

280 views Asked by At

Abstract idea

I want to get the first value coming out of a set of Futures, that satisfies a given predicate.

If a satisfying value is found, all other Futures should be cancelled. If no value is found after all Futures have returned the execution should be terminated (by returning a default value or throwing an exception).

Concrete example

public boolean isThereVacantHotelRooms(Set<URL> hotelApiUrls) {
    // returns true if any provided server responds with a number larger than 0
}

I'm looking for a pretty way of implementing the above in Java 8 (external libs are fine). I have tried implementing it with CompletableFuture as well as RxJava, but I both feel very non-idiomatic for this problem and I end up with lots of ugly code.

2

There are 2 answers

0
akarnokd On BEST ANSWER

I think, your case can be accomplished with a combination of merge, filter and take:

List<Observable<HotelInfo>> hotels = new ArrayList<>();
for (URL u : urls) {
    Observable<HotelInfo> hotelInfo = networkAPI.askHotel(u);
    hotels.add(hotelInfo);
}
Observable.merge(hotels)
.filter(h -> h.vacancy > 0)
.take(1)
.subscribe(h -> System.out.println("Winner: " + h), Throwable::printStackTrace);
0
ZhongYu On

Since you have tried other solutions, here's the bayou solution for comparison

// throw exception for failure
public void checkVacantHotelRooms(Set<URL> hotelApiUrls) throws Exception
{
    checkVacantHotelRoomsAsync(hotelApiUrls)  // Async<Void>
        .timeout(Duration.ofSeconds(10))      // cancel on 10s
        .sync();    //  async -> sync
}

public Async<Void> checkVacantHotelRoomsAsync(Set<URL> hotelApiUrls)
{
    Stream<Async<Void>> resultStream = hotelApiUrls.stream()    // Stream<URL>
        .map(this::getResponseBodyAsync)                        // Stream<Async<String>>
        .map(asyncBody->asyncBody.map(this::checkResponse));    // Stream<Async<Void>

    return AsyncBundle.anyOf(resultStream); 
    // succeeds if one result succeeds; others will be cancelled
}

Void checkResponse(String responseBody) throws Exception
{
    if(responseBody.contains("something"))
        return (Void)null;
    throw new Exception("none in this hotel");
}

-----

HttpClient httpClient = new HttpClient();
int maxBodyLength = 1000;

Async<String> getResponseBodyAsync(URL url)
{
    Async<HttpResponse> asyncResponse = httpClient.doGet(url.toExternalForm());
    return asyncResponse.then(resp->resp.bodyString(maxBodyLength));
}