TPL Dataflow pipe constantly running inside service

844 views Asked by At

For example, I have three blocks:

       Buffer -> Transform -> Action 

I'm running webapi service which brings data from requests to the buffer block. How to create such a pipe which will be running forever without invoking Completion() at Action block and stoping whole pipe.

2

There are 2 answers

2
JSteward On BEST ANSWER

If you need the pipeline to remain for the lifetime of the application and not just the request you could use a static class to hold it. There's not necessarily any need to call complete on the action block. Another option depending on your needs would be to separate the application and the processing pipeline. These could be separated by a database message queue or just separate server side applications.

@svick has a good point using a TaskCompletionSource to determine when the pipeline has finished with a particular item. Putting it all together here's a quick sample that might be helpful:

public class Controller {

    public async Task<int> PostToPipeline(int inputValue) {
        var message = new MessageIn(inputValue);
        MyPipeline.InputBuffer.Post(message);
        return await message.Completion.Task;
    }
}

public class MessageIn {
    public MessageIn(int value) {
        InputValue = value;
        Completion = new TaskCompletionSource<int>();
    }

    public int InputValue { get; set; }
    public TaskCompletionSource<int> Completion { get; set; }
}

public class MessageProcessed {
    public int ProcessedValue { get; set; }
    public TaskCompletionSource<int> Completion { get; set; }
}

public static class MyPipeline {

    public static BufferBlock<MessageIn> InputBuffer { get; private set; }
    private static TransformBlock<MessageIn, MessageProcessed> transform;
    private static ActionBlock<MessageProcessed> action;

    static MyPipeline() {
        BuildPipeline();
        LinkPipeline();

    }

    static void BuildPipeline() {
        InputBuffer = new BufferBlock<MessageIn>();

        transform = new TransformBlock<MessageIn, MessageProcessed>((Func<MessageIn, MessageProcessed>)TransformMessage, new ExecutionDataflowBlockOptions() {
            MaxDegreeOfParallelism = Environment.ProcessorCount,
            BoundedCapacity = 10
        });

        action = new ActionBlock<MessageProcessed>((Action<MessageProcessed>)CompletedProcessing, new ExecutionDataflowBlockOptions() {
            MaxDegreeOfParallelism = Environment.ProcessorCount,
            BoundedCapacity = 10
        });
    }

    static void LinkPipeline() {
        InputBuffer.LinkTo(transform, new DataflowLinkOptions() { PropagateCompletion = true });
        transform.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = true });
    }

    static MessageProcessed TransformMessage(MessageIn message) {
        return new MessageProcessed() {
            ProcessedValue = message.InputValue++,
            Completion = message.Completion
        };
    }

    static void CompletedProcessing(MessageProcessed message) {
        message.Completion.SetResult(message.ProcessedValue);
    }
} 

There a few ways to coordinate the completion of a specific job within the pipeline; awaiting the completion source may be the best approach for your needs.

0
svick On

Dataflow doesn't have a great solution for getting the output of a pipeline for certain input (because it supports more than just simple pipelines).

What you can do to work around that is to create a TaskCompletionSource<T> and send it along with the input to the pipeline. Each block in the pipeline sends it to the next block and the last one calls its SetResult().

The code that sends the input to the pipeline can then await the TaskCompletionSource's Task to wait for the output of the pipeline.