I have a TransformManyBlock
with the following design:
- Input: Path to a file
- Output: IEnumerable of the file's contents, one line at a time
I am running this block on a huge file (61GB), which is too large to fit into RAM. In order to avoid unbounded memory growth, I have set BoundedCapacity
to a very low value (e.g. 1) for this block, and all downstream blocks. Nonetheless, the block apparently iterates the IEnumerable greedily, which consumes all available memory on the computer, grinding every process to a halt. The OutputCount of the block continues to rise without bound until I kill the process.
What can I do to prevent the block from consuming the IEnumerable
in this way?
EDIT: Here's an example program that illustrates the problem:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
static IEnumerable<string> GetSequence(char c)
{
for (var i = 0; i < 1024 * 1024; ++i)
yield return new string(c, 1024 * 1024);
}
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 };
var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options);
var secondBlock = new ActionBlock<string>(str =>
{
Console.WriteLine(str.Substring(0, 10));
Thread.Sleep(1000);
}, options);
firstBlock.LinkTo(secondBlock);
firstBlock.Completion.ContinueWith(task =>
{
if (task.IsFaulted) ((IDataflowBlock) secondBlock).Fault(task.Exception);
else secondBlock.Complete();
});
firstBlock.Post('A');
firstBlock.Complete();
for (; ; )
{
Console.WriteLine("OutputCount: {0}", firstBlock.OutputCount);
Thread.Sleep(3000);
}
}
}
If you're on a 64-bit box, make sure to clear the "Prefer 32-bit" option in Visual Studio. I have 16GB of RAM on my computer, and this program immediately consumes every available byte.
You seem to misunderstand how TPL Dataflow works.
BoundedCapacity
limits the amount of items you can post into a block. In your case that means a singlechar
into theTransformManyBlock
and singlestring
into theActionBlock
.So you post a single item to the
TransformManyBlock
which then returns1024*1024
strings and tries to pass them on to theActionBlock
which will only accept a single one at a time. The rest of the strings will just sit there in theTransformManyBlock
's output queue.What you probably want to do is create a single block and post items into it in a streaming fashion by waiting (synchronously or otherwise) when it's capacity is reached: