I am studying Dataflow basics. Here is a code sample which looks correct to me but does not work. Instead of returning result integer value it freezes. In my understanding it is because completing is not being propagated between blocks.
public static int Process(int value)
{
var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
var options = new DataflowLinkOptions() { PropagateCompletion = true };
multiplyBlock.LinkTo(subtractBlock, options);
multiplyBlock.Post(value);
multiplyBlock.Complete();
subtractBlock.Completion.Wait(); // <--- code freezes here
return subtractBlock.Receive();
}
Could you please help me understand what is missing?
The
subtractBlock.Complete()is called automatically when the upstreammultiplyBlock.Completionis completed (because of thePropagateCompletion = true). At that point thesubtractBlock.Completionwill not complete immediately. It will complete when both the following two conditions become true:subtractBlockhave been processed.subtractBlockhave been accepted by a block downstream.In other words both the input and output buffers of a
TransformBlock<TInput, TOutput>must be empty, otherwise the block will never complete. In your case the output buffer of the block is not emptied, because the block is not linked to anything. You can solve this problem either by linking it to theNullTarget, or by replacing it with anActionBlock<TInput>.Btw be aware that currently the TPL Dataflow library has a bug that could cause a pipeline to never complete, under very specific and rare conditions. You can find demonstrations of this bug here. This bug has existed forever, and is unlikely to be resolved any time soon.