Asynchronously manipulating data from streamReader in F#

273 views Asked by At

On the line of Read large txt file multithreaded?, I have the doubt of whether it is equivalent to pass to each thread an sliced chunk of a Seq and whether it will safely handle the paralellism; is it StreamReader thread-safe?

Here is the code I am using to test this (any advice or critics on the used pattern is welcome :) )

nthreads = 4    

let Data = seq {
        use sr = new System.IO.StreamReader (filePath)
        while not sr.EndOfStream do
            yield sr.ReadLine ()
        }

let length = (Data |> Seq.length)

let packSize = length / nthreads

let groups =
     [ for i in 0..(nthreads - 1) -> if i < nthreads - 1  then Data |> Seq.skip( packSize * i )
                                                                    |> Seq.take( packSize )
                                                          else Data |> Seq.skip( packSize * i ) ]

let f = some_complex_function_modifiying_data

seq{ for a in groups -> f a }
        |> Async.Parallel
        |> Async.RunSynchronously
1

There are 1 answers

1
Tomas Petricek On BEST ANSWER

Your Data value has a type seq<string>, which means that it is lazy. This means that when you perform some computation that accesses it, the lazy sequence will create a new instance of StreamReader and read the data independently of other computations.

You can easily see this when you add some printing to the seq { .. } block:

let Data = seq {
    printfn "reading"
    use sr = new System.IO.StreamReader (filePath)
    while not sr.EndOfStream do
        yield sr.ReadLine ()  }

As a result, your parallel processing is actually fine. It will create a new computation for every single parallel thread and so the StreamReader instances are never shared.

Another question is if this is actually a useful thing to do - reading data from disk is often a bottle neck and so it might be faster to just do things in one loop. Even if this works, using Seq.length is a slow way to get the length (because it needs to read the whole file) and the same for skip. A better (but more complex) solution would probably be to use stream Seek.