I'm trying to write a simple program to take lines from std in, parse them, and insert a single record into a postgres database for each line. To test, I've been running it against a file with just cat my_file | java ...
Here's the code:
import scala.concurrent.ExecutionContext.Implicits.global
import scala.io.Source
import scala.util.Failure
import scala.util.Success
import slick.driver.PostgresDriver.api._
object LoadToDB extends App {
val db = Database.forConfig("dev-ingest")
for (line <- Source.fromInputStream(System.in).getLines()) {
val record = parse(line)
val insert = TableQuery[MyTable] += record
val fut = db.run(insert)
fut onComplete {
case Success(x) => {
System.out.println("Inserted one record: " + record)
}
case Failure(e) => {
e.printStackTrace()
}
}
}
}
In theory, the number of lines in the file, the number of "Inserted one record" statements printed, and the number of records in the database should all match. However, they are all different. There are more lines in the file than records in the database, and more records in the database than "Inserted" statements printed.
I'm a little bit new to scala/slick's async execution model, so my suspicion is I'm doing something wrong there. Perhaps when the main thread ends, all remaining threads don't get a chance to complete their execution? Is there some way to say "wait for all submitted tasks to finish"? I tried Await.result(db.shutdown(), Duration.Inf)
but that seems to prevent tasks from running to completion and just kills what it can immediately.
Your program is exiting before the futures complete. You have to block on them somewhere.