Scala Futures with multiple dependencies

320 views Asked by At

I have to compute asynchronously a set of features that can have multiple dependencies between each other (no loops). For example

 class FeatureEncoderMock(val n:String, val deps: List[String] = List.empty) {
      def compute = {
          println(s"starting computation feature $n")
          Thread.sleep(r.nextInt(2500))
          println(s"end computation feature $n")
      }
  }

  val registry = Map(
        "feat1" -> new FeatureEncoderMock("feat1", List("factLogA", "factLogB")),
        "factLogA" -> new FeatureEncoderMock("factLogA"),
        "factLogB" -> new FeatureEncoderMock("factLogB"),
        "feat1" -> new FeatureEncoderMock("feat1", List("factLogA", "factLogB")),
        "feat2" -> new FeatureEncoderMock("feat2", List("factLogA")),
        "feat3" -> new FeatureEncoderMock("feat3", List("feat1")),
        "feat4" -> new FeatureEncoderMock("feat4", List("feat3", "factLogB"))
  )

What I want to achieve is call a single function on feat4 that will trigger the computation of all dependent features and will take care of dependencies among them. I tried with this

def run(): Unit = {
val requested = "feat4"

val allFeatures = getChainOfDependencies(requested)

val promises = allFeatures.zip(Seq.fill(allFeatures.size)(Promise[Unit])).toMap

def computeWithDependencies(f: String) = Future {
  println(s"computing $f")
  val encoder = registry(f)

  if(encoder.deps.isEmpty) {
    promises(f).success(registry(f).compute)
  }
  else {
    val depTasks = promises.filterKeys(encoder.deps.contains)

    val depTasksFuture = Future.sequence(depTasks.map(_._2.future))

    depTasksFuture.onSuccess({
      case _ =>
        println(s"all deps for $f has been computed")
        promises(f).success(registry(f).compute)
        println(s"done for $f")
    })
  }
 }

computeWithDependencies(requested)
}

But I cannot understand why the order of execution is not as expected. I am not sure what is the proper way to feed the future inside a promise. I am quite sure that this piece of code is wrong on that part.

1

There are 1 answers

2
Joe K On BEST ANSWER

I think you're overthinking it with the promises; Future composition is probably all that you need. Something like this:

import scala.collection.mutable

def computeWithDependencies(s: String, cache: mutable.Map[String, Future[Unit]] = mutable.Map.empty)
                           (implicit ec: ExecutionContext): Future[Unit] = {
  cache.get(s) match {
    case Some(f) => f
    case None => {
      val encoder = registry(s)
      val depsFutures = encoder.deps.map(d => computeWithDependencies(d, cache))
      val result = Future.sequence(depsFutures).flatMap(_ => Future { encoder.compute })
      cache += s -> result
      result
    }
  }
}

The call to flatMap ensures that all of the dependency futures complete before the "current" future executes, even if the result (a List[Unit]) is ignored. The business with the cache is just to prevent recomputation if the dependency graph has a "diamond" in it, but could be left out if it won't or if you're ok with recomputing. Anyway, when running this:

val futureResult = computeWithDependencies("feat4")
Await.result(futureResult, 30 seconds)

I see this output:

starting computation feature factLogB
starting computation feature factLogA
end computation feature factLogB
end computation feature factLogA
starting computation feature feat1
end computation feature feat1
starting computation feature feat3
end computation feature feat3
starting computation feature feat4
end computation feature feat4

Which seems correct to me.