I'm trying to process large amount of text files via Parallel.ForEach
adding processed data to BlockingCollection
.
The problem is that I want the Task
taskWriteMergedFile to consume the collection and write them to result file at least every 800000 lines.
I guess that I can't test the collection size within the iteration because it is paralleled so I created the Task
.
Can I convert while(true) loop in the task to EventWaitHandle
in this case?
const int MAX_SIZE = 1000000;
static BlockingCollection<string> mergeData;
mergeData = new BlockingCollection<string>(new ConcurrentBag<string>(), MAX_SIZE);
string[] FilePaths = Directory.GetFiles("somepath");
var taskWriteMergedFile = new Task(() =>
{
while ( true )
{
if ( mergeData.Count > 800000)
{
String.Join(System.Environment.NewLine, mergeData.GetConsumingEnumerable());
//Write to file
}
Thread.Sleep(10000);
}
}, TaskCreationOptions.LongRunning);
taskWriteMergedFile.Start();
Parallel.ForEach(FilePaths, FilePath => AddToDataPool(FilePath));
mergeData.CompleteAdding();
You probably don't want to do it that way. Instead, have your task write each line to the file as it's received. If you want to limit the file size to 80,000 lines, then after the 80,000th line is written, close the current file and open a new one.
Come to think of it, what you have can't work because
GetConsumingEnumerable()
won't stop until the collection is marked as complete for adding. What would happen is that the thing would go through the sleep loop until there were 80,000 items in the queue, and then it would block on theString.Join
until the main thread callsCompleteAdding
. With enough data, you'd run out of memory.Also, unless you have a very good reason, you shouldn't use
ConcurrentBag
here. Just use the default forBlockingCollection
, which isConcurrentQueue
.ConcurrentBag
is a rather special purpose data structure that won't perform as well asConcurrentQueue
.So your task becomes:
That assumes, of course, that you've opened the output file somewhere else. It's probably better to open the output at the start of the task, and close it after the
foreach
has exited.On another note, you probably don't want your producer loop to be parallel. You have:
I don't know for sure what
AddToDataPool
is doing, but if it's reading a file and writing the data to the collection, you have a couple of problems. First, the disk drive can only do one thing at a time, so it ends up reading part of one file, then part of another, then part of another, etc. In order to read each chunk of the next file, it has to seek the head to the proper position. Disk head seeks are incredibly expensive--5 milliseconds or more. An eternity in CPU time. Unless you're doing heavy duty processing that takes much longer than reading the file, you're almost always better off processing one file at a time. Unless you can guarantee that the input files are on separate physical disks . . .The second potential problem is that with multiple threads running, you can't guarantee the order in which things are written to the collection. That might not be a problem, of course, but if you expect all of the data from a single file to be grouped together in the output, that's not going to happen with multiple threads each writing multiple lines to the collection.
Just something to keep in mind.