Can I convert while(true) loop to EventWaitHandle?

371 views Asked by At

I'm trying to process large amount of text files via Parallel.ForEach adding processed data to BlockingCollection.

The problem is that I want the Task taskWriteMergedFile to consume the collection and write them to result file at least every 800000 lines.

I guess that I can't test the collection size within the iteration because it is paralleled so I created the Task.

Can I convert while(true) loop in the task to EventWaitHandle in this case?

const int MAX_SIZE = 1000000;
static BlockingCollection<string> mergeData;
mergeData = new BlockingCollection<string>(new ConcurrentBag<string>(), MAX_SIZE);


string[] FilePaths = Directory.GetFiles("somepath");

var taskWriteMergedFile = new Task(() =>
{
    while ( true )
    {
        if ( mergeData.Count  > 800000)
        {
            String.Join(System.Environment.NewLine, mergeData.GetConsumingEnumerable());
            //Write to file
        }
        Thread.Sleep(10000); 
    }
}, TaskCreationOptions.LongRunning);

taskWriteMergedFile.Start();
Parallel.ForEach(FilePaths, FilePath => AddToDataPool(FilePath));
mergeData.CompleteAdding();
1

There are 1 answers

0
Jim Mischel On

You probably don't want to do it that way. Instead, have your task write each line to the file as it's received. If you want to limit the file size to 80,000 lines, then after the 80,000th line is written, close the current file and open a new one.

Come to think of it, what you have can't work because GetConsumingEnumerable() won't stop until the collection is marked as complete for adding. What would happen is that the thing would go through the sleep loop until there were 80,000 items in the queue, and then it would block on the String.Join until the main thread calls CompleteAdding. With enough data, you'd run out of memory.

Also, unless you have a very good reason, you shouldn't use ConcurrentBag here. Just use the default for BlockingCollection, which is ConcurrentQueue. ConcurrentBag is a rather special purpose data structure that won't perform as well as ConcurrentQueue.

So your task becomes:

var taskWriteMergedFile = new Task(() =>
{
    int recordCount = 0;
    foreach (var line in mergeData.GetConsumingEnumerable())
    {
        outputFile.WriteLine(line);
        ++recordCount;
        if (recordCount == 80,000)
        {
            // If you want to do something after 80,000 lines, do it here
            // and then reset the record count
            recordCount = 0;
        }
    }
}, TaskCreationOptions.LongRunning);

That assumes, of course, that you've opened the output file somewhere else. It's probably better to open the output at the start of the task, and close it after the foreach has exited.

On another note, you probably don't want your producer loop to be parallel. You have:

Parallel.ForEach(FilePaths, FilePath => AddToDataPool(FilePath));

I don't know for sure what AddToDataPool is doing, but if it's reading a file and writing the data to the collection, you have a couple of problems. First, the disk drive can only do one thing at a time, so it ends up reading part of one file, then part of another, then part of another, etc. In order to read each chunk of the next file, it has to seek the head to the proper position. Disk head seeks are incredibly expensive--5 milliseconds or more. An eternity in CPU time. Unless you're doing heavy duty processing that takes much longer than reading the file, you're almost always better off processing one file at a time. Unless you can guarantee that the input files are on separate physical disks . . .

The second potential problem is that with multiple threads running, you can't guarantee the order in which things are written to the collection. That might not be a problem, of course, but if you expect all of the data from a single file to be grouped together in the output, that's not going to happen with multiple threads each writing multiple lines to the collection.

Just something to keep in mind.