How to stream result from n parallel requests send to x clients concurrently?

52 views Asked by At

I have several identical api services available on different clients to which I would like to send several http-requests. This data is can be transformed into a map (Lookup<TKey, TSource>). Additionally, I also want to return the response from a service as soon as it is available throught a IAsyncEnumerable.

A naive implementation would be to run through all requests one by one. However, I would like to make this more efficient by starting n requests to x services concurrently. Note I want to limit/throttle n and x. In other words x is how many clients I communicate with at once. I will not start communication with a new client until all requests has been processed. n define how many parallel open request I can have to one given client at once.

Is there a preferred way to do this or can it be done better than what I have come up with?

I have tried to make a solution by using TransformBlock and TransformManyBlock, but I am not very familiar with these objects thus any feedback is appreciated.

public static async IAsyncEnumerable<TOut> ParallelForEveryEntry<TKey, TSource, TOut>(
    this IEnumerable<TSource> source,
    Func<TSource, TKey> keySelector,
    int n,
    int x,
    Func<TSource, Task<TOut>> callback,
    CancellationToken token = default
) where TKey : notnull {
    if (!source.Any()) {
        yield break;
    }

    var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);

    TransformManyBlock<IGrouping<TKey, TSource>, TOut> blockCollection = new(collectionByEntry => {
        var linkedToken = linkedTokenSource.Token;

        TransformBlock<TSource, TOut> blockEntry = new(callback, new ExecutionDataflowBlockOptions() {
            CancellationToken = linkedToken,
                MaxDegreeOfParallelism = x,
                EnsureOrdered = false
            });

            // Feed the block with input data
            foreach (var entry in collectionByEntry) {
                blockEntry.Post(entry);
            }
            blockEntry.Complete();

            return blockEntry.ReceiveAllAsync().ToEnumerable();
    }, new ExecutionDataflowBlockOptions() {
        CancellationToken = token,
        MaxDegreeOfParallelism = n,
        EnsureOrdered = false
    });

    // Feed the block with input data
    foreach (var collectionByEntry in source.ToLookup(keySelector)) {
        blockCollection.Post(collectionByEntry);
    }
    blockCollection.Complete();

    // Emit the output data as they become available
    while (await blockCollection.OutputAvailableAsync()) {
        while (blockCollection.TryReceive(out var item)) {
            yield return item;
        }
    }

    // Propagate possible exception (including cancellation)
    await blockCollection.Completion;
}
0

There are 0 answers