Try Dequeue in ConcurrentQueue

31k views Asked by At

The TryDequeue in ConcurrentQueue<T> will return false if no items in queue.

If the queue is empty I need that my queue will wait until new item to be added in queue and it dequeue that new one, and the process will continues like that.

Should I use Monitor.Enter, Wait, Pulse or any better options in C# 4.0?

3

There are 3 answers

0
Damien_The_Unbeliever On BEST ANSWER

Isn't this what BlockingCollection is designed for?

As I understand it, you can wrap your ConcurrentQueue with one of these, and then call Take.

3
Tomas Walek On

You could periodically check the number of elements in the queue and when the number of elements is greater than zero, you give a signal using e.g. ManualResetEvent to the thread which dequeues the element(s) until the queue is empty.

Here is the pseudo code for this:

Check Thread:

while(true)
{
  int QueueLength = 0;
  lock(Queue)
  {
    queueLength = Queue.Length;
  }

  if (Queue.Length > 0)
  {
    manualResetEvent.Set();
  }
  else
  {
    Thread.Sleep(...);
  }       
}    

Dequeue Thread:

while(true)
{
  if(manualResetEvent.WaitOne(timeout))
  {
    DequeueUntilQueueEmpty();
  }
}

Consider using lock in the DequeueUntilQueueEmpty, too.

0
Yair Levi On

You can use BlockingCollection.

Do something like that:

private BlockingCollection<string> rowsQueue;
private void ProcessFiles() {
   this.rowsQueue = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000);
   ReadFiles(new List<string>() { "file1.txt", "file2.txt" });


   while (!this.rowsQueue.IsCompleted || this.rowsQueue.Count > 0)
   {
       string line = this.rowsQueue.Take();

       // Do something
   }
}

private Task ReadFiles(List<string> fileNames)
{
    Task task = new Task(() =>
    {
        Parallel.ForEach(
        fileNames,
        new ParallelOptions
        {
            MaxDegreeOfParallelism = 10
        },
            (fileName) =>
            {
                using (StreamReader sr = File.OpenText(fileName))
                {
                    string line = String.Empty;
                    while ((line = sr.ReadLine()) != null)
                    {
                           this.rowsQueue.Add(line);
                    }
                }
            });

        this.rowsQueue.CompleteAdding();
    });

    task.Start();

    return task;
}