Is there a Dataflow TransformBlock that receives two input arguments?

1k views Asked by At

I have a delegate that takes two numbers and creates a System.Windows.Point from them:

(x, y) => new Point(x,y);

I want to learn how can I use TPL Dataflow, specifically TransformBlock, to perform that.

I would have something like this:

ISourceBlock<double> Xsource;
ISourceBlock<double> Ysource;

ITargetBlock<Point> PointTarget;

// is there such a thing?
TransformBlock<double, double, Point> PointCreatorBlock;

// and also, how should I wire them together?

UPDATE:

Also, how can I assemble a network that joins more than two arguments? For example, let's say I have a method that receives eight arguments, each one coming from a different buffer, how can I create a block that knows when every argument has one instance available so that the object can be created?

1

There are 1 answers

0
JSteward On BEST ANSWER

I think what your looking for is the join block. Currently there is a two input and a three input variant, each outputs a tuple. These could be combined to create an eight parameter result. Another method would be creating a class to hold the parameters and using various block to process and construct the parameters class.

For the simple example of combining two ints for a point:

class MyClass {

    BufferBlock<int> Xsource;
    BufferBlock<int> Ysource;
    JoinBlock<int, int> pointValueSource;
    TransformBlock<Tuple<int, int>, Point> pointProducer;

    public MyClass() {
        CreatePipeline();
        LinkPipeline();
    }

    private void CreatePipeline() {
        Xsource = new BufferBlock<int>();

        Ysource = new BufferBlock<int>();

        pointValueSource = new JoinBlock<int, int>(new GroupingDataflowBlockOptions() {
            Greedy = false
        });

        pointProducer = new TransformBlock<Tuple<int, int>, Point>((Func<Tuple<int,int>,Point>)ProducePoint, 
            new ExecutionDataflowBlockOptions()
            { MaxDegreeOfParallelism = Environment.ProcessorCount });
    }

    private void LinkPipeline() {
        Xsource.LinkTo(pointValueSource.Target1, new DataflowLinkOptions() {
            PropagateCompletion = true
        });

        Ysource.LinkTo(pointValueSource.Target2, new DataflowLinkOptions() {
            PropagateCompletion = true
        });

        pointValueSource.LinkTo(pointProducer, new DataflowLinkOptions() {
            PropagateCompletion = true
        });

        //pointProduce.LinkTo(Next Step In processing)
    }

    private Point ProducePoint(Tuple<int, int> XandY) {
        return new Point(XandY.Item1, XandY.Item2);
    }
}

The JoinBlock will wait until it has data available on both of its input buffers to produce an output. Also, note that in this case if X's and Y's are arriving out of order at the input buffers care needs to be taken to re-sync them. The join block will only combine the first X and the first Y value it receives and so on.