Splitting the fs2 stream output to two files

2.1k views Asked by At

I'm just starting my adventure with fs2 streams. What I want to achieve, is to read a file (a large one, this is why I use fs2), transform it and write the result to two different files (based on some predicate). Some code (from https://github.com/typelevel/fs2), with my comment:

  val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0/9.0)

    io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
      .through(text.utf8Decode)
      .through(text.lines)
      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(line => fahrenheitToCelsius(line.toDouble).toString)
      .intersperse("\n")
      .through(text.utf8Encode)
      .through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
      /* instead of the last line I want something like this:
      .through(<write temperatures higher than 10 to one file, the rest to the other one>)
      */
  }

What is the most efficient way to do so? The obvious solution is to have two streams with different filters, but it's inefficient (there will be two passes).

3

There are 3 answers

4
Krzysztof Atłasik On BEST ANSWER

Unfortunately, as far as I know, there's no easy way to split fs2 stream into two.

What you could do, is splitting your stream by pushing values to one of two queues (1st for value under 10, 2nd for values over or equal 10). If we use NoneTerminatedQueue then queues will be not terminated until we put None into them. Then we can just use dequeue to create separate streams until queues are not closed.

Example solution below. I split writting to file and reading into separate methods:

import java.nio.file.Paths
import cats.effect.{Blocker, ExitCode, IO, IOApp}
import fs2.concurrent.{NoneTerminatedQueue, Queue}
import fs2.{Stream, io, text}

object FahrenheitToCelsius extends IOApp {

  def fahrenheitToCelsius(f: Double): Double =
    (f - 32.0) * (5.0 / 9.0)

  //I split reading into separate method
  def read(blocker: Blocker, over: NoneTerminatedQueue[IO, Double], under: NoneTerminatedQueue[IO, Double]) = io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
    .through(text.utf8Decode)
    .through(text.lines)
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => fahrenheitToCelsius(line.toDouble))
    .evalMap { value =>
      if (value > 10) { //here we put values to one of queues
        over.enqueue1(Some(value)) //until we put some queues are not close
      } else {
        under.enqueue1(Some(value))
      }
    }
    .onFinalize(
      over.enqueue1(None) *> under.enqueue1(None) //by putting None we terminate queues
    )

  //function write takes as argument source queue and target file
  def write(s: Stream[IO, Double], blocker: Blocker, fileName: String): Stream[IO, Unit] = {
    s.map(_.toString)
      .intersperse("\n")
      .through(text.utf8Encode)
      .through(io.file.writeAll(Paths.get(fileName), blocker))
  }

  val converter: Stream[IO, Unit] = for {
    over <- Stream.eval(Queue.noneTerminated[IO, Double]) //here we create 2 queues
    under <- Stream.eval(Queue.noneTerminated[IO, Double])
    blocker <- Stream.resource(Blocker[IO])
    _ <- write(over.dequeue, blocker, "testdata/celsius-over.txt") //we run reading and writing to both
      .concurrently(write(under.dequeue, blocker, "testdata/celsius-under.txt")) //files concurrently
      .concurrently(read(blocker, over, under)) //stream runs until queue over is not terminated
  } yield ()

  override def run(args: List[String]): IO[ExitCode] =
    converter
      .compile
      .drain
      .as(ExitCode.Success)

}
0
Dawid Łakomy On

I've managed to find another solution. Here it is:

import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
import fs2.{io, text, Stream}
import fs2.io.file.WriteCursor
import java.nio.file.Paths

object Converter extends IOApp {

  val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0/9.0)

    def saveFiltered(in: Stream[IO,Double], blocker: cats.effect.Blocker, filename: String, filter: Double => Boolean) = {
      val processed = in.filter(filter).intersperse("\n").map(_.toString).through(text.utf8Encode)

      Stream.resource(WriteCursor.fromPath[IO](Paths.get(filename), blocker)).flatMap(_.writeAll(processed).void.stream)
    }

    io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
      .through(text.utf8Decode)
      .through(text.lines)
      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(line => fahrenheitToCelsius(line.toDouble))
      .observe( in => saveFiltered(in, blocker, "testdata/celsius_over.txt", {n => n >= 0}) )
      .through( in => saveFiltered(in, blocker, "testdata/celsius_below.txt", {n => n < 0}) )
  }

  def run(args: List[String]): IO[ExitCode] =
    converter.compile.drain.as(ExitCode.Success)
}

I think it's a bit easier to understand than the answer involving queues (queues appear to be a common solution to similar cases, though).

0
l7r7 On

It's also possible to use broadcastThrough, which allows broadcasting all elements of a stream to multiple Pipes.

A full solution to your problem could look like this (using cats effect 3.3.8 and fs2 3.2.5. That's why it looks a bit different but the main idea is the same regardless of the versions):

import cats.effect.{IO, IOApp}
import fs2.io.file.{Files, Path}
import fs2.{Pipe, Stream, text}

object Converter extends IOApp.Simple {

  val converter: Stream[IO, Unit] = {
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0 / 9.0)

    def saveFiltered(filename: Path, predicate: Double => Boolean): Pipe[IO, Double, Unit] =
      _.filter(predicate)
        .map(_.toString)
        .through(text.utf8.encode)
        .through(Files[IO].writeAll(filename))

    Files[IO].readAll(Path("testdata/fahrenheit.txt"))
      .through(text.utf8.decode)
      .through(text.lines)
      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(line => fahrenheitToCelsius(line.toDouble))
      .broadcastThrough(
        saveFiltered(Path("testdata/celsius_over.txt"), { n => n >= 0 }),
        saveFiltered(Path("testdata/celsius_below.txt"), { n => n < 0 })
      )
  }

  def run: IO[Unit] =
    converter.compile.drain
}

saveFiltered is now a function returning Pipe that is built using a filename and a Predicate. This function is used to build two arguments for broadcastThrough. I tested it for a small example and FWIW it works as expected.

broadcastThrough guarantees that all elements from the stream are sent to all pipes. There's one little caveat that's mentioned in the Scaladoc: the slowest pipe will cause the whole stream to slow down. I don't think this is a problem in this particular case because I'd guess that both pipes are equally fast.


You could even go a step further and generalize the idea a little bit:

def partition[F[_] : Concurrent, A, B](predicate: A => Boolean, in: Pipe[F, A, B], out: Pipe[F, A, B]): Pipe[F, A, B] =
  _.broadcastThrough[F, B](
    _.filter(predicate).through(in),
    _.filter(a => !predicate(a)).through(out)
  )

With that you don't have to make sure that the two predicates produce results that are mutually exclusive.

With a slightly adapted saveFiltered:

def saveFiltered2(filename: Path): Pipe[IO, Double, Unit] =
  _.map(_.toString)
    .through(text.utf8.encode)
    .through(Files[IO].writeAll(filename))

the last part of the stream is a bit shorter:

...
.through(
  partition(n => n >= 0,
    saveFiltered2(Path("testdata/celsius_over.txt")),
    saveFiltered2(Path("testdata/celsius_below.txt"))))```