From the TPL documentation
As with
ActionBlock<TInput>
,TransformBlock<TInput,TOutput>
defaults to processing one message at a time, maintaining strict FIFO ordering.
However, in a multi-threaded scenario, i.e. if multiple threads are "simultaneously" doing SendAsync
and then "awaiting" for a result by calling ReceiveAsync
, how do we guarantee that the thread that posted something into the TransformBlock<TInput,TOutput>
actually gets the intended result that it is waiting for?
In my experiments, it seems like the way to "guarantee" my desired outcome, is to add the option BoundedCapacity = 1
. At least the thread(s) still doesn't get blocked when sending and receiving.
If I don't do this, some threads will receive the result intended for another thread.
Is this the right approach in this particular use case?
Here is some code that illustrates my concern:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace ConsoleTransformBlock
{
class Program
{
private readonly static TransformBlock<int, int> _pipeline;
static Program()
{
_pipeline = new TransformBlock<int, int>(async (input) =>
{
await Task.Delay(RandomGen2.Next(5, 100)).ConfigureAwait(false);
return input;
},
new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }); // this is the fix???
}
static void Main(string[] args)
{
var dop = System.Environment.ProcessorCount;// 8-core
Parallel.For(0, dop, new ParallelOptions() { MaxDegreeOfParallelism = dop },
(d) =>
{
DoStuff().Wait();
});
Console.WriteLine("Parallel For Done ...");
var tasks = new Task[dop];
for (var i = 0; i < dop; i++)
{
var temp = i;
tasks[temp] = Task.Factory.StartNew
(async () => await DoStuff().ConfigureAwait(false),
CancellationToken.None,
TaskCreationOptions.LongRunning,
TaskScheduler.Default).Unwrap();
}
Task.WaitAll(tasks);
}
private static async Task DoStuff()
{
for (var i = 0; i < 100; i++)
{
var temp = RandomGen2.Next();
await _pipeline.SendAsync(temp).ConfigureAwait(false);
Console.WriteLine("Just sent {0}, now waiting {1}...", new object[] { temp, System.Threading.Thread.CurrentThread.ManagedThreadId });
await Task.Delay(RandomGen2.Next(5, 50)).ConfigureAwait(false);
var result = await _pipeline.ReceiveAsync().ConfigureAwait(false);
Console.WriteLine("Received {0}... {1}", new object[] { result, System.Threading.Thread.CurrentThread.ManagedThreadId });
if (result != temp)
{
var error = string.Format("************** Sent {0} But Received {1}", temp, result, System.Threading.Thread.CurrentThread.ManagedThreadId);
Console.WriteLine(error);
break;
}
}
}
/// <summary>
/// Thread-Safe Random Generator
/// </summary>
public static class RandomGen2
{
private static Random _global = new Random();
[ThreadStatic]
private static Random _local;
public static int Next()
{
return Next(0, int.MaxValue);
}
public static int Next(int max)
{
return Next(0, max);
}
public static int Next(int min, int max)
{
Random inst = _local;
if (inst == null)
{
int seed;
lock (_global) seed = _global.Next();
_local = inst = new Random(seed);
}
return inst.Next(min, max);
}
}
}
}
TransformBlock
already maintains FIFO order. The order in which you post items to the block is the exact order in which the items will be returned from the block.From Dataflow (Task Parallel Library)
You can see that with this example:
In which the order will be ordered 0-99.
However, what you seem to want is some correlation with threads, so a thread will post an item to the block and then receive its result. This doesn't really fit into TPL Dataflow which should be more a pipeline of blocks. You can hack it with
BoundedCapacity = 1
but you probably shouldn't.