Use Parallel.For in batches in dotnet core

1.4k views Asked by At

I am using httptrigger function in dotnet core where i am getting httprequest data in Json format.I need to insert this value in Google Merchant center account. There are almost 9000 rows (dynamic data each time) that needs to be inserted. How i can implement the Parallel.for logic which will execute faster. Currently i am using for each loop like below but it is taking more time. Below is the code.

string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
dynamic body = JsonConvert.DeserializeObject(requestBody);
for (int i =0;i<body.Count;i++)
{
  Product newProduct = InsertProduct(merchantId, websiteUrl,body[i]);
}
2

There are 2 answers

3
Martin On BEST ANSWER

I created a small example maybe there you can find the best way which fits your case best.

dotnet fiddle Example

There are 3 options:

In sequence

As the title says every item is processed in sequence. Very save method but not the fastest one to process 9000 items :)

var list = GenerateItems();
var count = list.Count();
for(var i = 0; i < count; i++) 
{
    InsertInDatabaseAsync($"{i}", list.ElementAt(i)).GetAwaiter().GetResult();
}

With Parallel.For Library

Like said from the comments its good for CPU bound processing but has some lacks on async methods (here)

var list = GenerateItems();
var count = list.Count();
var options = new ParallelOptions{MaxDegreeOfParallelism = MAX_DEGREE_OF_PARALLELISM};
Parallel.For(0, count, options, (i) => 
{
    InsertInDatabaseAsync($"{i}", list.ElementAt(i)).GetAwaiter().GetResult();
});

With Async-Await

I think in your example this fits best for you. Every item is processed in parallel, starts the processing directly and spinns up a Task. (Copied the async-extension from here)

var list = GenerateItems();
var count = list.Count();

// Extensions method see in referenced SO answer
ForEachAsync(count, list, async (item, index) => 
{
    await InsertInDatabaseAsync($"{index}", item);
}).GetAwaiter().GetResult();

...Updated

Thanks for the comments. I have updated the async-await implementation to a more simpler one:

private static async Task ForEachAsync<T>(IEnumerable<T> enumerable, Func<T, int, Task> asyncFunc)
{
    var itemsCount = enumerable.Count();
    var tasks = new Task[itemsCount];
    int i = 0;
    foreach (var t in enumerable)
    {
        tasks[i] = asyncFunc(t, i);
        i++;
    }
    await Task.WhenAll(tasks);
}

And also added the MAX_DEGREE_OF_PARALLELISM set to 1. This has a huge impact on the parallel processing like described in the commends.

enter image description here

2
Oluwadamilola Adegunwa On

Do this.

string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
dynamic body = JsonConvert.DeserializeObject(requestBody);
Parallel.For(0, body.Count, i => {
    Product newProduct = InsertProduct(merchantId, websiteUrl,body[i]);
});