I have a stream that picks up tasks from the db, then go through few steps:
step1 ~> step2 ~> step3 and so forth, now step 2 is a step that performing rest api call to another service, and sometimes it can take long time to respond...or the service im calling to is down...
my question is what kind of reference does the streams keep the task in their hand in those cases... I found a weird behaviour that sometimes there is sort of data leak, I see some tasks that have data from another task, which is extremely weird, and if you look at the time that the task arrived its almost identical, so im suspecting maybe 1 task is waiting for the rest api call to response and by the time he gets the response this thread don't hold ref to this task and gets a new task in hand and it gets mixed up...
my stream:
class TaskMainStream @Inject()(mainGraph: TaskMainGraph,
source: TaskFullFlowSource,
override val actorSystem: ActorSystem)
(override implicit val exec: ExecutionContext,
override implicit val mat: Materializer) extends StaticBuilderGraph {
val flow: Flow[GraphElement[Task], GraphElement[Task], NotUsed] = mainGraph.mainFlow
def runGraph(switchSharedKill: SharedKillSwitch) = {
graph(source.tasks(), flow, Sink.ignore, switchSharedKill).run()
}
}
StaticBuilderGraph with the graph definition:
trait StaticBuilderGraph {
def actorSystem: ActorSystem
implicit def exec: ExecutionContext
implicit def mat: Materializer
def graph[T1, T2, M1, M2](source: Source[T1, M1], flow: Flow[T1, T2, _], sink: Sink[T2, M2], switch: SharedKillSwitch): RunnableGraph[(SharedKillSwitch, M2)] = {
source.viaMat(switch.flow)(Keep.right).via(flow).toMat(sink)(Keep.both).withAttributes(supervisionStrategy(resumingDecider))
}
}
def mainFlow: Flow[GraphElement[Task], GraphElement[Task], NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
val fetchDataStep = builder.add(parallelFetchFlows)
val stepWithApiCallToAnotherService = builder.add(stepWithApiCallToAnotherServiceFlow)
val completeTask = builder.add(completeTaskFlow)
// more steps exist here just making it shorter for the example
fetchDataStep ~> shouldSkipNextStep
shouldSkipNextStep.out(Yes) ~> something ~> completeTask ~> final
shouldSkipNextStep.out(No) ~> stepWithApiCallToAnotherService ~> final
FlowShape(goalRouter.in, final.out)
})
this is GraphElement, just case class that holds the task and errors if occurs
case class GraphElement[T](task: T, errors: List[Throwable])
i want to share also parallelFetchFlows cause i suspect the issue might be there:
private def parallelFetchFlows = Flow.fromGraph(GraphDSL.create() { implicit builder =>
val fanOut = builder.add(Broadcast[GraphElement[Model]](2))
val zip = builder.add(ZipWith[GraphElement[Model], GraphElement[Model], GraphElement[Model]] {
case (GraphElement(taskLeft, errorsLeft), GraphElement(taskRight, errorsRight)) =>
GraphElement(taskLeft.copy(kids = taskRight.kids, steps = (taskLeft.steps ::: taskRight.steps).distinct), errorsLeft ::: errorsRight)
})
fanOut.out(0) ~> fetchPersonFlow ~> zip.in0
fanOut.out(1) ~> fetchKidsFlow ~> zip.in1
FlowShape(fanOut.in, zip.out)
})
task is my db documents that contains person and kids
any hints will help, tell me if you need more info to understand better but im really confused.
thanks!