Parallel.For in a List, acting upon the items in sequential order, only in the start

1.9k views Asked by At

I have a List<TaskClass> TaskList items that we can iterate over using a Parallel loop.

The items in the list are sorted in a particular order as the TaskClass implements IComparable with its own CompareTo(object obj) method. Thus we need the items acted upon in sequential order.

Note they do NOT have to complete in sequential order, just start in sequential.

Thus TaskList[0] should be started first; then TaskList[1], TaskList[2], ... However, we don't care if TaskList[2] completes first, or TaskList[0].

This is the quick code I've come up with to try and alleviate this:

//Construct a ConcurrentQueue and populate it with our SORTED list
//of TaskClass items so when we go through a parallel loop
//they are acted upon in sorted order. A parallel loop does not
//guarantee ordering, which we need to make sure tasks with a higher
//number are done first.
ConcurrentQueue<TaskClass> cq = new ConcurrentQueue<TaskClass>();
for (int x = 0; x < TaskList.Count; x++)
    cq.Enqueue(TaskList[x]);

Parallel.For(
    0,
    cq.Count,
    new ParallelOptions { MaxDegreeOfParallelism = DISystem.MaxConcurrentThreads },
    x =>
    {
        TaskClass tc = null;
        if (cq.TryDequeue(out tc))
        {
            TaskTypeManager ttm = new TaskTypeManager();
            tc.Working = true;
            tc.Started = DateTime.Now;
            ttm.ProcessTaskItem(tc);
        }
    }
);

Now the issue I believe is when the Parallel.For loop completes, the original List<TaskClass> TaskList will not have been updated with the latest values.

What is the best way to accomplish this?

With modified code like the following? (lines marked with "//new")

ConcurrentQueue<TaskClass> cq = new ConcurrentQueue<TaskClass>();
for (int x = 0; x < TaskList.Count; x++)
    cq.Enqueue(TaskList[x]);

List<TaskClass> NewTaskList = new List<TaskClass>(); //new
object lockObject = new Object(); //new

Parallel.For(
    0,
    cq.Count,
    new ParallelOptions { MaxDegreeOfParallelism = DISystem.MaxConcurrentThreads },
    x =>
    {
        TaskClass tc = null;
        if (cq.TryDequeue(out tc))
        {
            TaskTypeManager ttm = new TaskTypeManager();
            tc.Working = true;
            tc.Started = DateTime.Now;
            ttm.ProcessTaskItem(tc);
            lock (lockObject) //new
            {
                NewTaskList.Add(tc);
            }
        }
    }
);

NewTaskList.Sort(); //new
TaskList.Clear(); //new
TaskList = NewTaskList.ToList(); //new

Or does anyone have any other ideas/suggestions/improvements?

2

There are 2 answers

1
Frank On BEST ANSWER

Will this work? - No. Maybe most of the time, but not if you really need it ordered.

There is an inherent problem with the statement "they do have to start in order". What do you mean with "start"? You probably have a race condition there. Consider this modification:

x =>
{
    TaskClass tc = null;
    if (cq.TryDequeue(out tc))
    {
        Thread.Sleep(random.Next(0, 1000));
        TaskTypeManager ttm = new TaskTypeManager();
         ...

As you can see, the only thing happening in order is your items being dequeued - after that, parallelism kicks in and no order is guaranteed. You need some kind of synchronization into ProcessTaskItem, up to the point where you consider the task to be actually "started".

3
Theodor Zoulias On

Note they do NOT have to complete in sequential order, just start in sequential.

In multithreading this doesn't make sense in general. The operating system can suspend any thread at any time for a duration of around 15 milliseconds (demo). So a thread that started before another thread, might be immediately suspended after executing just a couple of instructions, essentially before having a chance to do anything, while the second thread runs miles away. And you have no control over it.

In your case I believe that what you want is to serialize the assignment of the properties Working and Started. This is doable, but it requires an extra step. You must make the assignment while the Parallel loop enumerates the source, not inside the delegate that is invoked in parallel. The easiest way to do this is to expose the taskList as an iterator. Iterators in C# are methods that contain yield return statements. Here is an example:

IEnumerable<TaskClass> Iterator()
{
    foreach (TaskClass tc in taskList)
    {
        tc.Working = true;
        tc.Started = DateTime.Now;
        yield return tc;
    }
}

Now you could pass this iterator directly to the Parallel.ForEach method, but this would be a mistake. The Parallel.ForEach employs chunk partitioning by default, that needs to be disabled with the EnumerablePartitionerOptions.NoBuffering option, so that the parallel loop requests the next item from the iterator immediately before it is ready to process it:

var partitioner = Partitioner.Create(Iterator(),
    EnumerablePartitionerOptions.NoBuffering);

And now we are cooking:

Parallel.ForEach(
    partitioner,
    new ParallelOptions { MaxDegreeOfParallelism = DISystem.MaxConcurrentThreads },
    (TaskClass tc) =>
    {
        TaskTypeManager ttm = new TaskTypeManager();
        ttm.ProcessTaskItem(tc);
    }
);

The Working and Started properties will be assigned in enumeration order. The Parallel.ForEach synchronizes the enumeration of the source, so avoid doing anything heavy inside the iterator. Otherwise the synchronization may affect the performance of the whole operation.