ConcurrentQueue with multithreading

14k views Asked by At

I am new to multi threading concepts. I need to add certain number of strings to a queue and process them with multiple threads. Using ConcurrentQueue which is thread safe.

This is what I have tried. But all the items added into concurrent queue are not processed. only first 4 items are processed.

class Program
{
    ConcurrentQueue<string> iQ = new ConcurrentQueue<string>();
    static void Main(string[] args)
    {
        new Program().run();
    }

    void run()
    {
        int threadCount = 4;
        Task[] workers = new Task[threadCount];

        for (int i = 0; i < threadCount; ++i)
        {
            int workerId = i;
            Task task = new Task(() => worker(workerId));
            workers[i] = task;
            task.Start();
        }

        for (int i = 0; i < 100; i++)
        {
            iQ.Enqueue("Item" + i);
        }

        Task.WaitAll(workers);
        Console.WriteLine("Done.");

        Console.ReadLine();
    }

    void worker(int workerId)
    {
        Console.WriteLine("Worker {0} is starting.", workerId);
        string op;
        if(iQ.TryDequeue(out op))
        {
            Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
        }

        Console.WriteLine("Worker {0} is stopping.", workerId);
    }


}
3

There are 3 answers

7
Sefe On BEST ANSWER

There are a couple of issues with your implementation. The first and obvious one is that the worker method only dequeues zero or one item and then stops:

    if(iQ.TryDequeue(out op))
    {
        Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    }

It should be:

    while(iQ.TryDequeue(out op))
    {
        Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    }

That however won't be enough to make your program work properly. If your workers are dequeueing faster than the main thread is enqueueing, they will stop while the main task is still enqueueing. You need to signal the workers that they can stop. You can define a boolean variable that will be set to true once enqueueing is done:

for (int i = 0; i < 100; i++)
{
    iQ.Enqueue("Item" + i);
}
Volatile.Write(ref doneEnqueueing, true);

The workers will check the value:

void worker(int workerId)
{
    Console.WriteLine("Worker {0} is starting.", workerId);
    do {
        string op;
        while(iQ.TryDequeue(out op))
        {
            Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
        }
        SpinWait.SpinUntil(() => Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0));
    }
    while (!Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0))
    Console.WriteLine("Worker {0} is stopping.", workerId);
}  
5
Maksim Simkin On

Your workers take one item out of the queue and then finish the work, just let them work till queue is empty.

Replace if in worker function with while

void worker(int workerId)
{
    Console.WriteLine("Worker {0} is starting.", workerId);
    string op;
    while (iQ.TryDequeue(out op))
    {
        Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    }

    Console.WriteLine("Worker {0} is stopping.", workerId);
}

As you will run it you will see, that near all items will be processed by two workers. Reason: your cpu has two cores, both are working and there is no "free tiem slot" to create new task. If you want to have all your 4 task to process items, you could add a delay to give your processor time to create anotehr tasks, something like:

while (iQ.TryDequeue(out op))
{
    Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    Task.Delay(TimeSpan.FromMilliseconds(1)).Wait();
}

that gives you output, that you want:

...
Worker 0 is processing item Item8
Worker 1 is processing item Item9
Worker 2 is processing item Item10
Worker 3 is processing item Item11
Worker 3 is processing item Item13
Worker 1 is processing item Item12
...
0
Jesse Williams On

I've actually been working with ConcurrentQueue quite a bit recently and thought I'd share this. I've create a custom ConcurrentQueue called CQItems that has methods to build itself with given parameters. Internally, when you tell it to build x number of y items, it makes a Parallel.For call to the item constructors. The benefit here is that when a method or function calls CQItems myCQ = CQItems.Generate(x, y) that call comes from the base application thread, meaning that nothing can look at the queue until it is finished building. But internally to the queue class, it's building with threads, and is significantly quicker than just using a List<> or Queue<>. Mostly it's generating things out of thin air, but it also sometimes (based on params) is creating items from SQL - basically generating objects based on existing data. At any rate, these are the two methods in the CQItems class that can help with this:

public void Generate(int numberOfItems = 1, ItemOptions options = ItemOptions.NONE)
    {
        try
        {
            Type t = typeof(T);

            if (t == typeof(Item))
                throw new ArgumentException("Type " + t + " is not valid for generation.  Please contact QA.");

            else
                Parallel.For(0, numberOfItems, (i, loopState) =>
                {
                    try
                    {
                        GenerateItemByType(typeof(T), options);
                    }

                    catch
                    {
                        loopState.Stop();
                        throw;
                    }

                });
        }

        catch (AggregateException ae)
        {
            ae.Handle((x) =>
            {
                if (x is SQLNullResultsException)
                {
                    throw x;
                }
                else if (x is ImageNotTIFFException)
                {
                    throw x;
                }
                else
                {
                    throw x;
                }

                return true;
            });
        }

        catch
        {
            throw;
        }

        finally
        {
            ItemManager.Instance.Clear();
        }
    }

    private void GenerateItemByType(Type t, ItemOptions options = ItemOptions.NONE)
    {
        try
        {
            if (t == typeof(ItemA))
            {
                if ((options & ItemOptions.DUPLICATE) != 0)
                {
                    this.Enqueue(new ItemB(options));
                }
                else
                {
                    this.Enqueue(new ItemA(options));
                }
            }
            else if (t == typeof(ItemC))
            {
                this.Enqueue(new ItemC(options));
            }
        }

        catch
        {
            throw;
        }

        finally { }
    }

Some useful notes:

Supplying the loopState variable in the Parallel.For() allows us to set the state to stop if an exception is caught. This is nice because if your loop is asked to do 1000 things, and the 5th iteration throws an exception, it's going to keep looping. You may want it to, but in my case an exception needs to exit the threaded loop. You'll still end up with an AggregateException coming out of it (apparently, that's just what happens when threads throw exception). Parsing those out and only sending the first one can save a LOT of time and headaches trying to weed through a giant exception group where later exceptions may (or may not) have been caused due to the first anyway.

As for the rethrows, I try to add a catch statement for most expected types of exceptions even if I plan to just throw them up the stack anyway. Part of this is for troubleshooting (being able to break on specific exceptions can be handy). Part of it is because sometimes I want to be able to do other things, such as stopping the loop, changing or adding to the exception message, or in the case of breaking apart the AggregateException, only send one exception back up the stack rather than the whole aggregate. Just a point of clarification for anyone who might be looking at this.

Lastly, in case it's confusing, the Type(T) value is coming from my CQItems class itself:

     public class CQItems<T> : ConcurrentQueue<Item>, IDisposable