How to implement dynamic Channel where producer is 3rd party?

131 views Asked by At

I receive continuous data into NewMessage(Msg) that I need to offload to a threadpool/worker thread and latency is very important. NewMessage(Msg) is an inherited method from a 3rd party library where I can't or rather don't want to make changes to it's parameters.

The Msg's received within NewMessage(Msg) frequency is variable and can be a 300 Msg' per second or even just 1. Also the messages are continuous from the time that I start the application until I end it.

I have read up on Channel and can implement it in a basic format but I need a more complicated version that I can't find examples of.

Basic implementation thus far:

private readonly Channel<Msg> channel;
private readonly ChannelWriter<Msg> writer;
private readonly ChannelReader<Msg> reader;
private CancellationTokenSource ctSource;
private CancellationToken ct;

void CTor()
{
    channel = Channel.CreateUnbounded<Msg>();
    writer = channel.Writer;
    reader = channel.Reader;
    ctSource = new CancellationTokenSource();
}
void NewMessage(Msg newMsg)
{
    if(!ct.IsCancellationRequested)
    {       
        channel.TryWrite(newMsg);
    }
}
async void ConsumeMessages(CancellationToken ct)
{
    await foreach(Msg newMsg in channel.Reader.ReadAsync())
    {
        if(ct.IsCancellationRequested) { return; }
        MethodConsume(newMsg);
    }
}
void StartConsuming()
{
    ct = new ctSource.Token;
    ConsumeMessages(ct);
}
void StopConsuming()
{
    if(ct.Exists)            // looking for correct syntax
    { ctSource.Cancel; }
}

How can I implement multiple consumers to ensure Msg consumption is optimal in terms of latency?

2

There are 2 answers

7
Phoenix404 On

May not the exact solution but I would suggest implementing a robust and efficient solution using the System.Threading.Channels namespace in .NET, ensuring thread safety, FIFO processing, dynamic scaling of consumers, and monitoring of consumer efficiency.

You can create Producter

using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading;

public class MessageProcessor
{
    private readonly Channel<Msg> channel;
    private readonly ChannelWriter<Msg> writer;
    private Task _workerTask;
    private CancellationTokenSource ctSource;

    public MessageProcessor()
    {
        channel = Channel.CreateUnbounded<Msg>();
        writer = channel.Writer;
        ctSource = new CancellationTokenSource();
    }

    public void NewMessage(Msg newMsg)
    {
        if (!ctSource.Token.IsCancellationRequested)
        {
            writer.TryWrite(newMsg);
        }
    }

    public void Start()
    {
        _workerTask = StartProcessingAsync();
    }

    private async Task StartProcessingAsync()
    {
        var reader = channel.Reader;
        await Parallel.ForEachAsync(reader.ReadAllAsync(ctSource.Token), ctSource.Token, async (msg, token) =>
        {
            await ProcessMessageAsync(msg, token);
        });
    }

    private async Task ProcessMessageAsync(Msg msg, CancellationToken token)
    {
        Console.Writeln(msg)
    }

    public async Task StopAsync()
    {
        writer.Complete();
        await _workerTask;
        ctSource.Cancel();
    }
}

Usage:

public class Msg
{
    public string Content { get; set; }
    public DateTime Timestamp { get; set; }

    public Msg(string content)
    {
        Content = content;
        Timestamp = DateTime.UtcNow;
    }
}

class MyClass
{
    static async Task Main(string[] args)
    {
        var messageProcessor = new MessageProcessor();
        messageProcessor.Start();
        for (int i = 0; i < 100; i++)
        {
            var messageContent = $"Message {i}";
            messageProcessor.NewMessage(new Msg(messageContent));
            await Task.Delay(10);
        }
        messageProcessor.Stop();
    }
}

20
Panagiotis Kanavos On

Processing FIX messages in a pipeline

From more comments it looks like the real scenario is processing FIX messages. In the FIX protocol the source (eg a stock exchange) sends messages like stock orders in a continuous stream. The clients must process the messages or lose them. FIX libraries invariably have quirks so the specifics matter. All of them work in a somewhat reactive manner though, pushing incoming messages to applications.

Let's start with QuickFix's example for processing incoming orders. The library is really old (11 years), based on old Java (so it has no properties, delegates or generics) and instead of callback methods, it expects clients to implement specific methods. Despite the name, FromApp is the callback that receives application messages like new orders. The MessageCracker.Crack method in turn calls the strongly-typed OnMessage callback based on the message's type :

public class MyApplication : MessageCracker, IApplication
{
    public void OnMessage(
        QuickFix.FIX42.NewOrderSingle ord,
        SessionID sessionID)
    {
        ProcessOrder(ord.Price, ord.OrderQty, ord.Account);
    }

    protected void ProcessOrder(
        Price price,
        OrderQty quantity,
        Account account)
    {
        //...
    }

    public void FromApp(Message msg, SessionID sessionID)
    {
        Crack(msg, sessionID);
    }
...
}

The orders could be processed on a different thread by using an ActionBlock<NewOrderSingle> :

public class MyApplication : MessageCracker, IApplication
{
    ActionBlock<NewOrderSingle> _block;

    public MyApplication()
    {
        _block=new ActionBlock<NewOrderSingle>(ord=>ProcessOrder(ord.Price, ord.OrderQty, ord.Account));
    }

    public void OnMessage(
        QuickFix.FIX42.NewOrderSingle ord,
        SessionID sessionID)
    {
        _block.Post(ord);
    }

This guarantees that all orders will be processed in a different thread, in order.

We could use a different block per symbol :

public class MyApplication : MessageCracker, IApplication
{
    ConcurrentDictionary<string,ActionBlock<NewOrderSingle>> _blocks;

    public MyApplication()
    {
        _blocks=new ConcurrentDictionary<string,ActionBlock<NewOrderSingle>>();
    }

    public void OnMessage(
        QuickFix.FIX42.NewOrderSingle ord,
        SessionID sessionID)
    {
        var symbol=ord.Symbol.getValue(); // Java origins, no properties        
        var block=_blocks.GetOrAdd(symbol,smb=>new ActionBlock<NewOrderSingle>(ord=>ProcessOrder(ord.Price, ord.OrderQty, ord.Account));

        _block.Post(ord);
    }

Second Answer

From the comments :

I am not in control of the producer - that is an interface I inherit and I must work with NewMessage(Msg) within my class.

In that case it's probably easier to use ActionBlock, which encapsulates buffering, parallelism and ordered processing. Items posted to an ActionBlock are processed by a single background worker by default but the number can change.

Assuming the interface only has a NewMessage(Msg) method and the class is disposable, the block can be created in the constructor and stopped in Dispose or DisposeAsync. NewMessage only has to post the message to the block :

class MyConsumerClass:IExternalConsumer,IAsyncDisposable
{
    ActionBlock<Msg> _block;
    ILogger<MyConsumerClass> _logger;
    
    public MyConsumerClass(ILogger<MyConsumerClass> logger )
    {
        _block=new ActionBlock<Msg>(ProcessMessage);
        _logger=logger;
    }

    void NewMessage(Msg msg)
    {
        _block.Post(msg);
    }

    async Task ProcessMessage(Msg msg)
    {
        try
        {
            // Process the message
        }
        //Uncaught exceptions will terminate the block
        catch(Exception exc)
        {       
            _logger.LogException(exc,"...");
        }

    }

...
}

The DisposeAsync method stops the block and waits for pending messages to complete. This snippet omits the full disposal pattern and just shows the minimum code :

    async Task StopBlockAsync()
    {
        _block.Complete();
        await _block.Completion;
    }
    public async ValueTask DisposeAsync()
    {
        //Standard IAsyncDisposable implementation ommited
        await StopBlockAsync();
    }

Original Answer

Using channels is far easier than this. There's no need for fields and CancellationToken is only needed if we want to stop producing, consuming and discard all pending messages.

All a producer needs to do is return a ChannelReader, nothing more :

ChannelReader<Msg> Produce(CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<Msg>();
    var writer=channel.Writer;

    //Emulate an actual producer
    _ = Task.Run(async ()=>{
        while(!token.IsCancellationRequested)
        {
            await Task.Delay(100);
            //Sending may be blocked if we use a Bounded channel that's full
            await writer.SendAsync( new Msg(....),token);

        }
    },token)
    //Ensure we `Complete` even in case of error, propagating the error
    .ContinueWith(t=>{
        writer.TryComplete(t.Exception);
    });
    return channel.Reader;
}

Keeping the Channel inside Produce means the method has full control of its lifetime, completion and possibly error handling. This simplifies consuming or composing multiple steps a lot easier.

The consumer doesn't need access to the Channel itself, only the reader.

async Task Consume(ChannelReader<Msg> reader, CancellationToken token=default)
{
    await foreach(var msg in reader.ReadAllAsync(token))
    {
        //Extra check if we want to exit rearly
        if (token.IsCancellationRequested) return;

        //Process the message
        ....
    }
}

This allows for easy composition:

var reader=Produce();
await Consume(reader);

Once Produce ends, completion will be propagated to Consume. We can terminate prematurely if we pass an optional CancellationToken :

//Cancel after 1 minute
var cts=new CancellationTokenSource(60000);
var reader=Produce(cts.CancellationToken);
await Consume(reader,cts.CancellationToken);

Create a Pipeline

The processing method could return its own ChannelReader with the results of processing. This would allow creating a pipeline of processing steps/blocks

ChannelReader<Msg2> Convert2(ChannelReader<Msg> reader, CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<Msg2>();
    var writer=channel.Writer;

    _ = Task.Run(async ()=>{
        await foreach(var msg in reader.ReadAllAsync(token))
        {
            var msg= SomehowProduceMsg2(msg);
            await writer.SendAsync( msg2,token);
        }
    },token)
    //Ensure we `Complete` even in case of error, propagating the error
    .ContinueWith(t=>{
        writer.TryComplete(t.Exception);
    });
    return channel.Reader;
}

This way we can create a pipeline of processing steps:

var reader=Produce();
var reader2=await Convert2(reader);
await Consume2(reader2);

Reducing boilerplate

The code starts getting too verbose but luckily, it's easy to make a generic Convert method that calls a Func<T1,T2> we pass as a parameter :

ChannelReader<T2> Convert<T1>(ChannelReader<T1> reader, Func<T1,T2> func, MsgCancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<T2>();
    var writer=channel.Writer;

    _ = Task.Run(async ()=>{
        await foreach(var msg in reader.ReadAllAsync(token))
        {
            var msg= func(msg);
            await writer.SendAsync( msg2,token);
        }
    },token)
    //Ensure we `Complete` even in case of error, propagating the error
    .ContinueWith(t=>{
        writer.TryComplete(t.Exception);
    });
    return channel.Reader;
}

...

var reader2=Convert(reader1,SomehowProduceMsg2,token);

Parallel Processing

An easy way to process more than 1 message at a time is to use Parallel.ForEachAsync instead of await foreach, with the degree-of-parallelism we want:

ChannelReader<Msg2> Convert2(ChannelReader<Msg> reader, int dop=1,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<Msg2>();
    var writer=channel.Writer;

    _ = Task.Run(async ()=>{

        await Parallel.ForEachAsync(reader.ReadAllAsync(token),(msg,t)=>{
            var msg= SomehowProduceMsg2(msg);
            await writer.SendAsync( msg2,t);
        },
        new ParallelOptions{ MaxDegreeOfParallelism = dop},
        token);
    },token)
    //Ensure we `Complete` even in case of error, propagating the error
    .ContinueWith(t=>{
        writer.TryComplete(t.Exception);
    });
    return channel.Reader;
}

Or, using the generic method :

async Task MyConvertFunc(ChannelReader<Msg1> reader,CancellationToken token=default)
{
    await Parallel.ForEachAsync(reader.ReadAllAsync(token),(msg,t)=>{
            var msg= SomehowProduceMsg2(msg);
            await writer.SendAsync( msg2,t);
    },
    new ParallelOptions{ MaxDegreeOfParallelism = dop},
    token);
}

var reader2=Convert(reader1,MyConvertFunc,token);