i didn't want to use complex constructs in System.Collection.Concurrent namespace or PLinq etc. so i decided to implement my solution to this classic problem. i think what i came up is as fast as possible (considering producing is slow and network bound in my case). but i wonder if it has any issues (deadlocks etc) so i want your opinion. here is my code
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
namespace SyncTest
{
static class MultiProducerMultiConsumer
{
static Queue<int> queue = new Queue<int>();
static object syncRootQueue = new object();
static object syncRootConsumer = new object();
static ManualResetEvent rseJobReady = new ManualResetEvent(false);
static bool nextShouldWait = true;
public static void Start()
{
for (int i = 0; i < 2; i++)
{
new Thread(Consume).Start(); // queueing into Threadpool is not necessary because Thread wont ever return to pool.
}
for (int i = 0; i < 6; i++)
{
new Thread(Produce).Start();
}
}
static void Produce()
{
int threadId = Thread.CurrentThread.ManagedThreadId;
var rnd = new Random(threadId); // ive observed all producers produce same values otherwise.
int num;
while (true)
{
Thread.Sleep(rnd.Next(600, 900));
num = rnd.Next();
Trace.WriteLine(threadId + " producing " + num);
lock (syncRootQueue)
{
queue.Enqueue(num);
if (queue.Count == 1) //which means queue was previously emtpy and there will be always a thread waiting/going to wait for signal.
rseJobReady.Set();
}
}
}
static void Consume()
{
int job;
int threadId = Thread.CurrentThread.ManagedThreadId;
bool lastItem = false; // only for displaying purpose. can be removed in production code.
while (true)
{
lock (syncRootConsumer) //only one consumer is wanted inside this block
{
if (nextShouldWait) //perf optimization to avoid blocking call to WaitOne
rseJobReady.WaitOne();
lock (syncRootQueue) //we need always synchronize accessing queue
{
job = queue.Dequeue();
if (nextShouldWait = lastItem = queue.Count == 0)
{
rseJobReady.Reset(); //wait for signal is required when next consumer enters outer lock
}
}
}
if (lastItem)
Trace.WriteLine(threadId + " got last item " + job);
else
Trace.WriteLine(threadId + " consuming " + job);
Thread.Sleep(new Random().Next(120, 300));
}
}
}
}