TPL Dataflow Consumer to Process Multiple Items at a time

1.9k views Asked by At

I have a requirement to iterate through a large list and for each item call a web service to get some data. However, I want to throttle the number of requests to the WS to say no more than 5 concurrent requests executing at any one time. All calls to the WS are made using async/await. I am using the TPL dataflow BufferBlock with a BoundedCapacity of 5. Everything is working properly but what I am noticing is that the consumer, which awaits the WS call, blocks the queue until its finished resulting in all the requests in the bufferblock being performed serially. Is it possible to have the consumer always processing 5 items off the queue at a time? Or do I need to set up multiple consumers or start looking into action blocks? So in a nutshell I want to seed the queue with 5 items. As one item is processed a sixth will take its place and so on so I always have 5 concurrent requests going until there are no more items to process.

I used this as my guide: Async Producer/Consumer Queue using Dataflow

Thanks for any help. Below is a simplified version of the code

//set up
BufferBlock<CustomObject> queue = new BufferBlock<CustomObject>(new DataflowBlockOptions { BoundedCapacity = 5 });
var producer = QueueValues(queue, values);
var consumer = ConsumeValues(queue);
await Task.WhenAll(producer, consumer, queue.Completion);
counter = await consumer;

//producer
function QueueValues(BufferBlock<CustomObject> queue, IList<CustomObject> values)
{

    foreach (CustomObject value in values)
    {
        await queue.SendAsync(value);
    }
    queue.Complete();
}


//consumer
function ConsumeValues(BufferBlock<CustomObject> queue)
{
    while (await queue.OutputAvailableAsync())
    {
        CustomObject value = await queue.ReceiveAsync();
            await CallWebServiceAsync(value);
    }
}
2

There are 2 answers

0
usr On

Your use of TPL Dataflow is rather strange. Normally, you'd move the consumption and processing into the flow. Append a TransformBlock to call the webservice. Delete ConsumeValues.

ConsumeValues executes sequentially which is fundamentally not what you want.

Instead of BoundedCapacity I think you rather want MaxDegreeOfParallelism.

0
i3arnon On

You should be using ActionBlock with MaxDegreeOfParallelism set to 5. You may also want to set a BoundedCapacity but that's for throttling the producer and not the consumer:

var block = new ActionBlock<CustomObject>(
    item => CallWebServiceAsync(item), 
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5,
        BoundedCapacity = 1000
    });

foreach (CustomObject value in values)
{
    await block.SendAsync(value);
}
block.Complete();
await block.Completion;