Parallelizing operations within a ConnectionIO transaction

181 views Asked by At

So I have a program in which I get a list of file paths from a database, delete those files on the filesystem and finally delete the file paths from the database. I put all operations inside a transaction to ensure that the paths would be deleted from the database iff all of the files are deleted in the filesystem.

Something like this

val result = for {
deletePath <- (fr""" select path from files""").query[String].stream //Stream[doobie.ConnectionIO,String]
_ <- Stream.eval(AsyncConnectionIO.liftIO(File(deletePath).delete()) //Stream[doobie.ConnectionIO,Unit]
_ <- Stream.eval(sql"delete from files where path = ${deletePath}".withUniqueGeneratedKeys) 
}

result.compile.drain.transact(transactor)

Unfortunately, the file system is distributed which means individual operation is slow but it allows multiple operations at once.

So my question is, how do I parallelize the filesystem deletion operation here?

1

There are 1 answers

2
Luis Miguel Mejía Suárez On BEST ANSWER

Yeah, you can. Just use appropriate combinators instead of the for syntax.

val result =
  (fr""" select path from files""")
    .query[String]
    .stream
    .parEvalMapUnordered(maxConcurrent = 64) { deletePath =>
      AsyncConnectionIO.liftIO(File(deletePath).delete()) >>
      sql"delete from files where path = ${deletePath}".withUniqueGeneratedKeys
    }

result.compile.drain.transact(transactor)

Remember to change the maxConcurrent parameter to something that makes sense for your use case.


(I couldn't test the code so it may have some typos)