I receive continuous data into NewMessage(Msg) that I need to offload to a threadpool/worker thread and latency is very important. NewMessage(Msg) is an inherited method from a 3rd party library where I can't or rather don't want to make changes to it's parameters.
The Msg's received within NewMessage(Msg) frequency is variable and can be a 300 Msg' per second or even just 1. Also the messages are continuous from the time that I start the application until I end it.
I have read up on Channel and can implement it in a basic format but I need a more complicated version that I can't find examples of.
Basic implementation thus far:
private readonly Channel<Msg> channel;
private readonly ChannelWriter<Msg> writer;
private readonly ChannelReader<Msg> reader;
private CancellationTokenSource ctSource;
private CancellationToken ct;
void CTor()
{
channel = Channel.CreateUnbounded<Msg>();
writer = channel.Writer;
reader = channel.Reader;
ctSource = new CancellationTokenSource();
}
void NewMessage(Msg newMsg)
{
if(!ct.IsCancellationRequested)
{
channel.TryWrite(newMsg);
}
}
async void ConsumeMessages(CancellationToken ct)
{
await foreach(Msg newMsg in channel.Reader.ReadAsync())
{
if(ct.IsCancellationRequested) { return; }
MethodConsume(newMsg);
}
}
void StartConsuming()
{
ct = new ctSource.Token;
ConsumeMessages(ct);
}
void StopConsuming()
{
if(ct.Exists) // looking for correct syntax
{ ctSource.Cancel; }
}
How can I implement multiple consumers to ensure Msg consumption is optimal in terms of latency?
May not the exact solution but I would suggest implementing a robust and efficient solution using the System.Threading.Channels namespace in .NET, ensuring thread safety, FIFO processing, dynamic scaling of consumers, and monitoring of consumer efficiency.
You can create Producter
Usage: