Parallel.ForEachAsync keep sort order

352 views Asked by At

I am trying to execute file upload using Parallel.ForEachAsync, it works but loses the sort order. Is there any method to synchronize sort order or source and destination lists?

await Parallel.ForEachAsync(model.DestinationFiles,
    new ParallelOptions { MaxDegreeOfParallelism = 20 }, async (file, CancellationToken) =>
    {
        var storeAsync = await _fileServerService.Init(displayUrl).StoreAsync(file.FileInfo, false, file.OutputFileName);
        convertResultDto.Files.Add(new ConverterConvertResultFile(storeAsync));
    });

Previously I used Linq parallel operator (PLINQ), which has the AsOrdered operator to deal with sorting. Anyway, I think the Parallel.ForEachAsync is better for using in async methods with I/O scenario?

var storeFiles = model.DestinationFiles.AsParallel().AsOrdered().WithDegreeOfParallelism(50)
    .Select(file => StoreAsync(file.FileInfo, false, file.OutputFileName).GetAwaiter().GetResult())
    .Select(storeFile => new StoreFile
    {
        FileId = storeFile.FileId,
        Url = storeFile.Url,
        OutputFileName = storeFile.OutputFileName,
        Size = storeFile.Size
    });
1

There are 1 answers

3
Stephen Cleary On

In this case, you're wanting to get a set of results and store them in a resulting collection. Parallel is designed for more operations without results. For operations with results, you can use PLINQ for CPU-bound operations or asynchronous concurrency for I/O-bound operations. Unfortunately, there isn't a PLINQ equivalent for Parallel.ForEachAsync, which would be the closest equivalent to your current code.

Asynchronous concurrency uses Task.WhenAll to get the results of multiple asynchronous operations. It can also use SemaphoreSlim for throttling. Something like this:

var mutex = new SemaphoreSlim(20);
var results = await Task.WhenAll(model.DestinationFiles.Select(async file =>
{
  await mutex.WaitAsync();
  try
  {
    var storeAsync = await _fileServerService.Init(displayUrl).StoreAsync(file.FileInfo, false, file.OutputFileName);
    return new ConverterConvertResultFile(storeAsync);
  }
  finally { mutex.Release(); }
});
convertResultDto.Files.AddRange(results);

However, if you have a mixture of CPU-bound and I/O-bound operations, then you'll probably want to continue to use ForEachAsync. In that case, you can create the entries in your destination collection first, then perform each operation with an index so it knows where to store them:

// This code assumes convertResultDto.Files is empty at this point.
var count = model.DestinationFiles.Count;
convertResultDto.Files.AddRange(Enumerable.Repeat<ConverterConvertResultFile>(null!, count));
await Parallel.ForEachAsync(
    model.DestinationFiles.Select((file, i) => (file, i)),
    new ParallelOptions { MaxDegreeOfParallelism = 20 },
    async item =>
    {
      var (file, i) = item;
      var storeAsync = await _fileServerService.Init(displayUrl).StoreAsync(file.FileInfo, false, file.OutputFileName);
      convertResultDto.Files[i] = new ConverterConvertResultFile(storeAsync);
    });