Merging scalaz-stream input processes seems to "wait" on stdin

233 views Asked by At

I have a simple program:

import scalaz._
import stream._

object Play extends App {
  val in1 = io.linesR("C:/tmp/as.txt")
  val in2 = io.linesR("C:/tmp/bs.txt")

  val p = (in1 merge in2) to io.stdOutLines
  p.run.run
}

The file as.txt contains five as and the file bs.txt contain 3 bs. I see this sort of output:

a
b
b
a
a
b
a
a
a

However, when I change the declaration of in2 as follows:

val in2 = io.stdInLines

Then I get what I think is unexpected behaviour. According to the documentation 1, the program should pull data non-deterministically from each stream according to whichever stream is quicker to supply stuff. This should mean that I see a bunch of as immediately printed to the console but this is not what happens at all.

Indeed, until I press ENTER, nothing happens. It's quite clear that the behaviour looks a lot like what I would expect if I was choosing a stream at random to get the next element from and then, if that stream was blocking, the merged process blocks too (even if the other stream contains data).

What is going on?

1 - well, OK, there is very little documentation, but Dan Spiewak said very clearly in his talk that it would grab whoever was the first to supply data

1

There are 1 answers

0
ryskajakub On BEST ANSWER

The problem is in the implementation of stdInLines. It is blocking, it never Task.forks a thread.

Try changing the implentation of stdInLines to this one:

def stdInLines: Process[Task,String] =
    Process.repeatEval(Task.apply { 
    Option(scala.Console.readLine())
    .getOrElse(throw Cause.Terminated(Cause.End))
})

The original io.stdInLines is running the readLine() in the same thread, so it always waits there until you type something.