I have to compute asynchronously a set of features that can have multiple dependencies between each other (no loops). For example
class FeatureEncoderMock(val n:String, val deps: List[String] = List.empty) {
def compute = {
println(s"starting computation feature $n")
Thread.sleep(r.nextInt(2500))
println(s"end computation feature $n")
}
}
val registry = Map(
"feat1" -> new FeatureEncoderMock("feat1", List("factLogA", "factLogB")),
"factLogA" -> new FeatureEncoderMock("factLogA"),
"factLogB" -> new FeatureEncoderMock("factLogB"),
"feat1" -> new FeatureEncoderMock("feat1", List("factLogA", "factLogB")),
"feat2" -> new FeatureEncoderMock("feat2", List("factLogA")),
"feat3" -> new FeatureEncoderMock("feat3", List("feat1")),
"feat4" -> new FeatureEncoderMock("feat4", List("feat3", "factLogB"))
)
What I want to achieve is call a single function on feat4 that will trigger the computation of all dependent features and will take care of dependencies among them. I tried with this
def run(): Unit = {
val requested = "feat4"
val allFeatures = getChainOfDependencies(requested)
val promises = allFeatures.zip(Seq.fill(allFeatures.size)(Promise[Unit])).toMap
def computeWithDependencies(f: String) = Future {
println(s"computing $f")
val encoder = registry(f)
if(encoder.deps.isEmpty) {
promises(f).success(registry(f).compute)
}
else {
val depTasks = promises.filterKeys(encoder.deps.contains)
val depTasksFuture = Future.sequence(depTasks.map(_._2.future))
depTasksFuture.onSuccess({
case _ =>
println(s"all deps for $f has been computed")
promises(f).success(registry(f).compute)
println(s"done for $f")
})
}
}
computeWithDependencies(requested)
}
But I cannot understand why the order of execution is not as expected. I am not sure what is the proper way to feed the future inside a promise. I am quite sure that this piece of code is wrong on that part.
I think you're overthinking it with the promises;
Future
composition is probably all that you need. Something like this:The call to
flatMap
ensures that all of the dependency futures complete before the "current" future executes, even if the result (aList[Unit]
) is ignored. The business with the cache is just to prevent recomputation if the dependency graph has a "diamond" in it, but could be left out if it won't or if you're ok with recomputing. Anyway, when running this:I see this output:
Which seems correct to me.