Guarantee TransformBlock output sequence

1.3k views Asked by At

From the TPL documentation

As with ActionBlock<TInput>, TransformBlock<TInput,TOutput> defaults to processing one message at a time, maintaining strict FIFO ordering.

However, in a multi-threaded scenario, i.e. if multiple threads are "simultaneously" doing SendAsync and then "awaiting" for a result by calling ReceiveAsync, how do we guarantee that the thread that posted something into the TransformBlock<TInput,TOutput> actually gets the intended result that it is waiting for?

In my experiments, it seems like the way to "guarantee" my desired outcome, is to add the option BoundedCapacity = 1. At least the thread(s) still doesn't get blocked when sending and receiving.

If I don't do this, some threads will receive the result intended for another thread.

Is this the right approach in this particular use case?

Here is some code that illustrates my concern:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace ConsoleTransformBlock
{
    class Program
    {
        private readonly static TransformBlock<int, int> _pipeline;

        static Program()
        {

            _pipeline = new TransformBlock<int, int>(async (input) =>
            {
                await Task.Delay(RandomGen2.Next(5, 100)).ConfigureAwait(false);
                return input;

            }, 
            new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }); // this is the fix???
        }


        static void Main(string[] args)
        {
            var dop = System.Environment.ProcessorCount;// 8-core


            Parallel.For(0, dop, new ParallelOptions() { MaxDegreeOfParallelism = dop },
                 (d) =>
                 {
                     DoStuff().Wait();
                 });

            Console.WriteLine("Parallel For Done ...");
            var tasks = new Task[dop];
            for (var i = 0; i < dop; i++)
            {
             var temp = i;
             tasks[temp] = Task.Factory.StartNew
                (async () => await DoStuff().ConfigureAwait(false),
                CancellationToken.None,
                TaskCreationOptions.LongRunning,
                TaskScheduler.Default).Unwrap();
            }

            Task.WaitAll(tasks);


        }

        private static async Task DoStuff()
        {
            for (var i = 0; i < 100; i++)
            {
                var temp = RandomGen2.Next();
                await _pipeline.SendAsync(temp).ConfigureAwait(false);
                Console.WriteLine("Just sent {0}, now waiting {1}...", new object[] { temp, System.Threading.Thread.CurrentThread.ManagedThreadId });
                await Task.Delay(RandomGen2.Next(5, 50)).ConfigureAwait(false);
                var result = await _pipeline.ReceiveAsync().ConfigureAwait(false);
                Console.WriteLine("Received {0}... {1}", new object[] { result, System.Threading.Thread.CurrentThread.ManagedThreadId });

                if (result != temp)
                {
                    var error = string.Format("************** Sent {0} But Received {1}", temp, result, System.Threading.Thread.CurrentThread.ManagedThreadId);
                    Console.WriteLine(error);
                    break;

                }

            }
        }

        /// <summary>
        /// Thread-Safe Random Generator
        /// </summary>
        public static class RandomGen2
        {
            private static Random _global = new Random();
            [ThreadStatic]
            private static Random _local;

            public static int Next()
            {
                return Next(0, int.MaxValue);
            }
            public static int Next(int max)
            {
                return Next(0, max);
            }
            public static int Next(int min, int max)
            {
                Random inst = _local;
                if (inst == null)
                {
                    int seed;
                    lock (_global) seed = _global.Next();
                    _local = inst = new Random(seed);
                }
                return inst.Next(min, max);
            }
        }
    }
}
1

There are 1 answers

3
i3arnon On BEST ANSWER

TransformBlock already maintains FIFO order. The order in which you post items to the block is the exact order in which the items will be returned from the block.

When you specify a maximum degree of parallelism that is larger than 1, multiple messages are processed simultaneously, and therefore, messages might not be processed in the order in which they are received. The order in which the messages are output from the block will, however, be correctly ordered.

From Dataflow (Task Parallel Library)

You can see that with this example:

private static async Task MainAsync()
{
    var transformBlock = new TransformBlock<int, int>(async input =>
    {
        await Task.Delay(RandomGen2.Next(5, 100));
        return input;
    }, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 10});

    foreach (var number in Enumerable.Range(0,100))
    {
        await transformBlock.SendAsync(number);
    }

    for (int i = 0; i < 100; i++)
    {
        var result = await transformBlock.ReceiveAsync();
        Console.WriteLine(result);
    }
}

In which the order will be ordered 0-99.

However, what you seem to want is some correlation with threads, so a thread will post an item to the block and then receive its result. This doesn't really fit into TPL Dataflow which should be more a pipeline of blocks. You can hack it with BoundedCapacity = 1 but you probably shouldn't.