How to convert Sequence of Future tasks to Enumerator, that would consume latest complete tasks

218 views Asked by At

I'm making expensive calculations, that I do in batch, and return chunked HTTP response. Periodically there are some calculations that are "more expensive then the others" and it might cause timeouts on Client.

Currently list of calculations combined in Enumerator like this:

def processBatch(batch: List[B]): Enumerator[A] = {
    val listOfCalculations:List[Future[A]] = batch.map(....)
    val futureList: Future[List[A]] = Future.sequence(listOfFutures)
    Enumerator.flatten(futureList.map(Enumerator.enumerate(_)))
}

With this implementation, Enumerator has input only after Future list is complete. This is not good, because there might be some ready values, waiting for an expensive task to complete.

I wonder if there is a better way to do this - in terms of feeding values to Enumerator as soon as they complete, and not as a batch, on all Futures complition?

2

There are 2 answers

0
johanandren On BEST ANSWER

Future.sequence means "the list of all these futures when they have completed" so you cannot use that.

There is a way to get a more imperative enumerator in play.api.libs.iteratee.Concurrent.unicast which you could use, so as soon as they get a result, they push it into the a Channel. This will represent imperative pushing as an enumerator that you can consume with an iteratee.

This way the first complete result arrives first etc.

You can find the docs for it here

0
lmm On

You could use Enumerator.unfoldM. Something like:

val listOfCalculations:List[Future[A]] = ...
Enumerator.unfoldM(0) { i =>
  if(i < listOfCalculations.length)
    listOfCalculations(i).map(a => Some((i + 1, a)))
  else Future.successful(None)
}

This is kind of like a fold. i keeps track of our position in the list; each time our mapper is called we get the next future from the list and return it with the new index.