Is my multi producer multi consumer approach correct?

62 views Asked by At

i didn't want to use complex constructs in System.Collection.Concurrent namespace or PLinq etc. so i decided to implement my solution to this classic problem. i think what i came up is as fast as possible (considering producing is slow and network bound in my case). but i wonder if it has any issues (deadlocks etc) so i want your opinion. here is my code

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;

namespace SyncTest
{
    static class MultiProducerMultiConsumer
    {
        static Queue<int> queue = new Queue<int>();
        static object syncRootQueue = new object();
        static object syncRootConsumer = new object();
        static ManualResetEvent rseJobReady = new ManualResetEvent(false);
        static bool nextShouldWait = true;

        public static void Start()
        {
            for (int i = 0; i < 2; i++)
            {
                new Thread(Consume).Start(); // queueing into Threadpool is not necessary because Thread wont ever return to pool. 
            }
            for (int i = 0; i < 6; i++)
            {
                new Thread(Produce).Start();
            }
        }


        static void Produce()
        {
            int threadId = Thread.CurrentThread.ManagedThreadId;
            var rnd = new Random(threadId); // ive observed all producers produce same values otherwise.
            int num;
            while (true)
            {
                Thread.Sleep(rnd.Next(600, 900));
                num = rnd.Next();
                Trace.WriteLine(threadId + " producing " + num);
                lock (syncRootQueue)
                {
                    queue.Enqueue(num);
                    if (queue.Count == 1) //which means queue was previously emtpy and there will be always a thread waiting/going to wait for signal.
                        rseJobReady.Set();
                }
            }
        }

        static void Consume()
        {
            int job;
            int threadId = Thread.CurrentThread.ManagedThreadId;
            bool lastItem = false; // only for displaying purpose. can be removed in production code.
            while (true)
            {
                lock (syncRootConsumer) //only one consumer is wanted inside this block
                {
                    if (nextShouldWait) //perf optimization to avoid blocking call to WaitOne
                        rseJobReady.WaitOne();
                    lock (syncRootQueue) //we need always synchronize accessing queue
                    {
                        job = queue.Dequeue();
                        if (nextShouldWait = lastItem = queue.Count == 0)
                        {
                            rseJobReady.Reset(); //wait for signal is required when next consumer enters outer lock
                        }
                    }
                }
                if (lastItem)
                    Trace.WriteLine(threadId + " got last item " + job);
                else
                    Trace.WriteLine(threadId + " consuming " + job);
                Thread.Sleep(new Random().Next(120, 300));
            }
        }
    }
}
0

There are 0 answers