I'm designing a pretty simple producer-consumer with Dataflow. I need a BatchBlock for batching a rather slow consumer (which can only be single-threaded due to EF DbContext). Everything works as expected, unless the fact that my producer is producing more message than I've set the in the BoundedCapacity. I was expected the producer to produce 1000 messages and then wait. What am I doing wrong?
[Test]
public async Task Run()
{
int producerCount = 0;
int consumerCount = 0;
Action logProgress = () =>
{
if (consumerCount % 1000 == 0 && consumerCount > 0 || producerCount % 1000 == 0 || consumerCount == producerCount)
{
Trace.WriteLine($"Progress - {consumerCount}/{producerCount}");
}
};
var batchBlock = new BatchBlock<int>(500, new GroupingDataflowBlockOptions { BoundedCapacity = 1000 });
var actionBlock = new ActionBlock<int[]>(async x =>
{
await Task.Delay(10);
foreach (var specificItem in x)
{
Interlocked.Increment(ref consumerCount);
}
logProgress();
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity= 1000,
});
batchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });
int[] items = Enumerable.Range(0, 240000).ToArray();
foreach (var item in items)
{
await batchBlock.SendAsync(item);
Interlocked.Increment(ref producerCount);
logProgress();
}
batchBlock.Complete();
await Task.WhenAll(batchBlock.Completion, actionBlock.Completion);
}
Output:
Progress - 0/1000
Progress - 0/2000
Progress - 0/3000
Progress - 0/4000
Progress - 0/5000
Progress - 0/6000
Progress - 0/7000
Progress - 0/8000
Progress - 0/9000
Progress - 0/10000
Progress - 0/11000
...
Progress - 31000/240000
Progress - 31500/240000
Progress - 32000/240000
Progress - 32500/240000
Progress - 33000/240000
Progress - 33500/240000
...
240,000 items, batched in batches of 500 items each, can be arranged in 480 batches. These are less than the 1,000 limit imposed by the
BoundedCapacitypolicy of theActionBlock<int[]>, so this policy has no observable effect with the given workload.Specifically the
ActionBlock<int[]>always responds withDataflowMessageStatus.Acceptedwhen theBatchBlock<int>invokes itsOfferMessagemethod. It never responds withPostponed, which is the behavior of aITargetBlock<TInput>block that has reached the limit of its capacity. So theBoundedCapacitypolicy of theBatchBlock<int>has no effect either, because theBatchBlock<int>is always able to propagate immediately the batches it creates to its linked target.