TPL Dataflow block never completes on PropagateCompletion

1k views Asked by At

Since the last alteration to my propagated-completion pipeline, one of my buffer blocks never completes. Let me summarize what was working and what isn't anymore:

Previously working:

A.LinkTo(B, PropagateCompletion);
B.LinkTo(C, PropagateCompletion);
C.LinkTo(D, PropagateCompletion);
D.Receive();

// everything completes

No longer working:

A.LinkTo(B, PropagateCompletion);
C.LinkTo(D, PropagateCompletion);

await A.Completion;
someWriteOnceBlock.Post(B.Count);
// B.Complete(); commented on purpose
B.LinkTo(C, PropagateCompletion);

D.Receive();

// Only A reaches completion
// B remains in 'waiting for activation'
// C executes but obviously never completes since B doesn't either

If I uncomment the commented line, everything works, but obviously that line should not be necessary.

Somehow my BufferBlock B never reaches completion, even though the block linked to it is completed and propagates its completion, and the block linked from it receives all the buffered items.

2

There are 2 answers

1
James Lucas On

By awaiting the completion of A none of the remaining code is executed until A completes. That's how await works - the code after it is wrapped in a continuation ready for the completion of the awaited code. So in this scenario B is linked to A after A completes so completion is not propagated I think.

0
Liam On

I had the same problem - my final block was a TransformBlock that was just sitting there awaiting completion.

A block will only complete if it has has no inputs remaining to process, all it's outputs have been processed and it has Completed, either by calling block.Complete or the completion being propagated down the chain.

I fixed this by making the final block an ActionBlock, as this doesn't produce any outputs. Here's my code, that takes a series of text files and deserialises their contents:

Dim maxDop = Environment.ProcessorCount
Dim executionOptions = New ExecutionDataflowBlockOptions With {.MaxDegreeOfParallelism = maxDop}
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

Dim inputBlock = New BufferBlock(Of String)
Dim transformBlock = New TransformBlock(Of String, IEnumerable(of JsonObject))(Function(fileName) DeserialiseFromFileAsync(fileName)
Dim outputBlock As New BufferBlock(Of IEnumerable(of JsonObject))
Dim combineBlock = New ActionBlock(Of IEnumerable(of JsonObject))(Sub(col) ' Do something to add all the collections together)

inputBlock.LinkTo(transformBlock, linkOptions)
transformBlock.LinkTo(outputBlock, linkOptions)
outputBlock.LinkTo(combineBlock, linkOptions)

For fileNo = 1 To 10
    inputBlock.Post(String.Concat("JsonFile", fileNo, ".txt"))
Next

inputBlock.Complete() 'Complete the first block, propogation will handle the rest
Await combineBlock.Completion 'Await the last block completing