Converting a collection of Awaitables to a time-ordered AsyncGenerator in Hack

620 views Asked by At

I am trying to implement Rx stream/observable merging with Hack async, and a core step is described by the title. A code version of this step would look something like this:

<?hh // strict
async function foo(Awaitable<Iterable<T>> $collection): Awaitable<void> {
  $ordered_generator = async_collection_to_gen($collection) // (**)
  foreach($ordered_generator await as $v) {
    // do something with each awaited value in the time-order they are resolved
  }
}

However, after mulling it over, I don't think I can write the starred (**) function. I've found that at some point or another, the implementations I've tried require functionality akin to JS's Promise.race, which resolves when the first of a collection Promises resolves/rejects. However, all of Hack's Awaitable collection helpers create an Awaitable of a fully resolved collection. Furthermore, Hack doesn't permit that we don't await async calls from async functions, which I've also found to be necessary.

Is it possible to anyone's knowledge?

1

There are 1 answers

0
concat On

This is possible actually! I dug around and stumbled upon a fork of asio-utilities by @jano implementing an AsyncPoll class. See PR for usage. It does exactly as I hoped.

So it turns out, there is an Awaitable called ConditionWaitHandle with succeed and fail methods* that can be invoked by any context (so long as the underlying WaitHandle hasn't expired yet), forcing the ConditionWaitHandle to resolve with the passed values.

I gave the code a hard look, and underneath it all, it works by successive Awaitable races, which ConditionWaitHandle permits. More specifically, the collection of Awaitables is compressed via AwaitAllWaitHandles (aka \HH\Asio\v) which resolves as slowly as the slowest Awaitable, then nested within a ConditionWaitHandle. Each Awaitable is awaited in an async function that triggers the common ConditionWaitHandle, concluding the race. This is repeated until the Awaitables have all resolved.

Here's a more compact implementation of a race using the same philosophy:

<?hh
function wait(int $i): Awaitable<void> {
    return Race::wrap(async { await HH\Asio\usleep($i); return $i; });
}
// $wait_handle = null;
class Race {
    public static ?ConditionWaitHandle $handle = null;
    public static async function wrap<T>(Awaitable<T> $v): Awaitable<void> {
        $ret = await $v;
        $handle = self::$handle;
        invariant(!is_null($handle), '');
        $handle->succeed($ret);
    }
}
Race::$handle = ConditionWaitHandle::create(
    \HH\Asio\v(
        Vector{
            (wait(1))->getWaitHandle(), 
            (wait(1000000))->getWaitHandle()
        }
    )
);
printf("%d microsecond `wait` wins!", \HH\Asio\join(Race::$handle));

Very elegant solution, thanks @jano!

*(the semblance to promises/deferred intensifies)


I am curious how premature completion via ConditionWaitHandle meshes with the philosophy that all Awaitables should run to completion.