I'm trying to change Stephen Toub's ForEachAsync<T> extension method into an extension which returns a result...
Stephen's extension:
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
My approach (not working; tasks get executed but result is wrong)
public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source,
int degreeOfParallelism, Func<T, Task<TResult>> body)
{
return Task.WhenAll<TResult>(
from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
select Task.Run<TResult>(async () =
{
using (partition)
while (partition.MoveNext())
await body(partition.Current); // When I "return await",
// I get good results but only one per partition
return default(TResult);
}));
}
I know I somehow have to return (WhenAll?) the results from the last part but I didn't yet figure out how to do it...
Update: The result I get is just degreeOfParallelism times null (I guess because of default(TResult)) even though all the tasks get executed. I also tried to return await body(...) and then the result was fine, but only degreeOfParallelism number of tasks got executed.
Your LINQ query can only ever have the same number of results as the number of partitions - you're just projecting each partition into a single result.
If you don't care about the order, you just need to assemble the results of each partition into a list, then flatten them afterwards.
(I've renamed this from
ForEachAsync, asForEachsounds imperative (suitable for theFunc<T, Task>in the original) whereas this is fetching results. Aforeachloop doesn't have a result - this does.)