Applying multiple map functions to streaming database results in Play 2.6

164 views Asked by At

I have a large query that seems to be a prime candidate for streaming results.

I would like to make a call to a function, which returns an object which I can apply additional map transformations on, and then ultimately convert the entire result into a list. This is because the conversions will results in a set of objects much smaller than the results in the database and there are many different transformations that must take place sequentially. Processing each result at a time will save me significant memory.

For example, if the results from the database were a stream (though the correct thing is likely an AkkaStream or an Iteratee), then I could do something like:

def outer(converter1[String, Int}, converter2[Int,Double]) { 
    val sqlIterator = getSqlIterator()
    val mappedIterator1 = sqlIterator.map(x => converter1(x.bigColumn))
    val mappedIterator2 = sqlIterator.map(x => converter2(x))
    val retVal = mappedIterator.toList
    retVal
}

def getSqlIterator() {
   val selectedObjects = SQL( """SELECT * FROM table""").map { x => 
       val id = x[Long]("id")
       val tinyColumn = x[String]("tiny_column")
       val bigColumn = x[String]("big_column")

       NewObject(id, tinyColumn, bigColumn)
   } 

   val transformed = UNKNOWN_FUNCTION(selectedObjects)
   transformed
}

Most of the documentation appears to provide the mechanism to apply a "reduce" function to the results, rather than a "map" function, but the resulting mapped functions will be much smaller, saving me significant memory. What should I do for UNKNOWN_FUNCTION?

1

There are 1 answers

1
Jeffrey Chung On

The following is a simple example of using Anorm's Akka Streams support to read the values from a single column of type String, applying two transformations to each element, and placing the results in a Seq. I'll leave it as an exercise for you to retrieve the values from multiple columns at a time, if that's what you need.

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink

import anorm._

import scala.collection.immutable.Seq
import scala.concurrent.Future

implicit val system = ActorSystem("MySystem")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val convertStringToInt: String => Int = ???
val convertIntToDouble: Int => Double = ???

val result: Future[Seq[Double]] =
  AkkaStream.source(SQL"SELECT big_column FROM table", SqlParser.scalar[String])
    .map(convertStringToInt)
    .map(convertIntToDouble)
    .runWith(Sink.seq[Double])