Handle large file with Stream in Scala

353 views Asked by At

I have a large csv file with users data. I have an endpoint that gets a user and should return a boolean indicating whether the user exists in the file or not. To avoid running out of memory I read the file in streams and use getLines which is lazy. This is my code:

  def readUsers(filePath: Path): IO[Seq[User]] = {
    Stream
      .eval(IO(scala.io.Source.fromFile(filePath.toFile)))
      .flatMap(source => Stream.fromIterator[IO](source.getLines(), 64))
      .map(line => line.split(",")
      .map(cols => User(cols(0), cols(1), cols(2), cols(3)))
      .compile
      .toList
  }

  def isUserExists(user: User, users: IO[Seq[User]]): IO[Boolean] =
    users.map(_.contains(user))

I assign users a value when the app starts:

val users = readUsers(filePath)

and send it to the isUserExists function whenever the endpoint is called.

My questions are:

  1. Does the toList keep the entire file to the memory?
  2. If so, how to avoid it? Should I remove the toList and iterate over each line in isUserExists?
  3. When the content of the file changes, will it be reflected in the endpoint since the users is an IO value?
2

There are 2 answers

5
Alexey Rykhalskiy On
import cats.effect.IO
import cats.implicits._
import fs2._
import fs2.io.file.Files
import fs2.io.file.Path
import java.nio.file.{Path => JPath}

object SO20230802 {

  val givenUser: User = ???

  def readUsers(filePath: JPath): IO[Boolean] =
    Files[IO]
      .readAll(Path.fromNioPath(filePath))
      .through(text.utf8.decode)
      .through(text.lines)
      .map(_.split(","))
      .map {
        case Array(a, b, c, d) => User(a, b, c, d).some
        case _                 => None
      }
      .unNone
      .dropWhile(_ != givenUser)
      .take(1)
      .as(true)
      .lastOr(false)
      .compile
      .lastOrError

}

fs2.io will manage resource properly

0
Philluminati On

Yeah.. your problem is that you are turning the stream into a concrete and complete list. To benefit from streaming you have to use streaming functions and references as much as possbile. I recommend readUsers returns a fs2.Stream[IO, User] and you start treating Streams as a "major" type that's used in signatures throughout your application. As you get more familiar with Streams it should feel more natural to use.

  import java.nio.file._
  case class User(name :String)
  import cats._
  import cats.effect._
  
  
  def readUsers(filePath: Path): fs2.Stream[IO, User] = {
    fs2.Stream
      .eval(IO(scala.io.Source.fromFile(filePath.toFile))) // please see fs2.io!!!
      .flatMap(source => fs2.Stream.fromIterator[IO](source.getLines()))
      .map(line => line.split(","))
      .map(cols => User(cols(0) /* simplified example, cols(1), cols(2), cols(3) */))
      // removed lines turning steam into a concrete thing.
  }

  def isUserExists(user: User, users: fs2.Stream[IO, User]): IO[Boolean] =
    userList
        .exists(_ == User("henry"))
        .last
        .compile
        .toList
        .map(_.flatten.headOption.getOrElse(false))

  val userList = fs2.Stream.fromIterator[IO](List(
    User("henry"),
    User("donald")
  ).toIterator)
  
  isUserExists(User("donald"), userList)
    .map { ans => println(s"Got this: $ans") }
    .unsafeRunSync()


In terms of examples, this doesn't really show off the great features of fs streams because exists() effectively drains the entire stream immediately. I'm not sure how the code is used but you want to focus on only traversing the stream once so repeated isUserExist calls doesn't necessarily fit the best usecase.