I'm just starting my adventure with fs2 streams. What I want to achieve, is to read a file (a large one, this is why I use fs2), transform it and write the result to two different files (based on some predicate). Some code (from https://github.com/typelevel/fs2), with my comment:
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
/* instead of the last line I want something like this:
.through(<write temperatures higher than 10 to one file, the rest to the other one>)
*/
}
What is the most efficient way to do so? The obvious solution is to have two streams with different filters, but it's inefficient (there will be two passes).
Unfortunately, as far as I know, there's no easy way to split fs2 stream into two.
What you could do, is splitting your stream by pushing values to one of two queues (1st for value under 10, 2nd for values over or equal 10). If we use
NoneTerminatedQueue
then queues will be not terminated until we putNone
into them. Then we can just usedequeue
to create separate streams until queues are not closed.Example solution below. I split writting to file and reading into separate methods: