How to create a custom grouped / hashed PLINQ partitioner or parallel query

191 views Asked by At

I am trying to process a list of file paths in parallel using PLINQ. I have to process all files with the same name, excluding extension, in the same thread, as that thread may be renaming file extensions, causing issues if done from different threads at the same time.

From the docs it looks like a Hash based partition can be created by using e.g. GroupBy(), or I need to create a custom Partition. I could not find usable examples of either, at least not what I understood and could get to work.

See:
https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.partitioner
https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-implement-dynamic-partitions
https://devblogs.microsoft.com/pfxteam/partitioning-in-plinq/
https://weblogs.asp.net/dixin/parallel-linq-2-partitioning

I'd like to ask advice on how to use e.g. GroupBy(), or is there a pre-existing hash partition scheme where I can just provide the hash key to a function?

Example code:

// All files with the same path minus extension must be processed together
var fileList = new List<string>()
{
    "/path1/file1.ext",
    "/path1/file2.ext",
    "/path2/file1.avi",
    "/path1/file1.mkv",
    "/path1/file3.avi",
    "/path1/file1.avi",
    "/path2/file3.mkv",
    "/path1/file2.mkv"
};

// Group files by path ignoring extensions
var pathDictionary = new Dictionary<string, List<string>>(StringComparer.OrdinalIgnoreCase);
fileList.ForEach(path => {
    string normalPath = Path.Combine(Path.GetDirectoryName(path), Path.GetFileNameWithoutExtension(path));
    if (pathDictionary.TryGetValue(normalPath, out var pathList))
    {
        pathList.Add(path);
    }
    else
    {
        pathDictionary.Add(normalPath, new List<string> { path });
    }
});

// HOWTO: Skip the grouping and use GroupBy() or a native hash iterator?

// Process groups in parallel
var partitioner = Partitioner.Create(pathDictionary, EnumerablePartitionerOptions.NoBuffering);
partitioner.AsParallel()
    .ForAll(keyPair =>
    {
        keyPair.Value.ForEach(fileName => {
            Console.WriteLine($"Thread: {Environment.CurrentManagedThreadId}, Path: {fileName}");
            Thread.Sleep(100);
        });
    });
1

There are 1 answers

2
Theodor Zoulias On BEST ANSWER

I think that you are close at solving your problem. The GroupBy operator will emit groups of paths that have the same key. So you just have to do a foreach loop inside the ForAll lambda, and process the paths that have the same key one by one:

pathList
    .GroupBy(path => Path.ChangeExtension(path, ""), StringComparer.OrdinalIgnoreCase)
    .AsParallel()
    .ForAll(g =>
    {
        Console.WriteLine($"Thread: {Environment.CurrentManagedThreadId}, Key: {g.Key}");
        foreach (string path in g)
        {
            Console.WriteLine($"Thread: {Environment.CurrentManagedThreadId}, Path: {path}");
            Thread.Sleep(100);
        }
    });

As you can see the GroupBy can be placed before or after the AsParallel. It doesn't make much of a difference, because retrieving the key of each path is not CPU intensive. So doing it sequentially should not be any slower than doing it in parallel.