i have a small problem.
for
map <- kafkaEventHoldLine.get // Ref[F, List[String]]
key = dr.derived + dr.metricId.toString
_ <- if !map.contains(key) then
for
_ <- addToHoldLine(key)
process <- outlierProcessor.process(dr).timed
(timeProcess, ol) = process
_ <- info("Outlier.process", timeProcess.toMillis.named("duration"), dr.derived.named("sid"), dr.metricId.toString.named("mid"))
append <- ol.traverse_(odt.append).timed
(timeAppend, _) = append
_ <- info("Outlier.append", timeAppend.toMillis.named("duration"), dr.derived.named("sid"), dr.metricId.toString.named("mid"))
_ <- counters.processOutlier.observeTime(timeProcess + timeAppend)
_ <- clearHoldLine(key)
yield ()
else
Async[F].sleep(delayHoldLine.milliseconds) >> outlierProcess(dr)
yield ()
this bunch of code is running concurrent. BUT i want to
for
map <- kafkaEventHoldLine.get // Ref[F, List[String]]
key = dr.derived + dr.metricId.toString
_ <- if !map.contains(key) then
for
_ <- addToHoldLine(key)
be synchronized between threads.
So i'm basically affraid of fetching outdated data from the ref in another thread, and i want to make all threads wait until working thread finishes addToHoldLine
function, because after executing it, i mutate the state of a ref, making map.contains(key) return true
It's not clear from your question whether this really needs to all be on one Thread or if it's enough for the code to ensure that only 1 runs at a time.
The tool for limiting concurrent work is
Semaphore
. Construct one in the factory for your class that holds the logic you want to limit, and then usepermit
around the specific work you want to limit.If you need to guarantee that all work happens on the same thread then construct a single-threaded
ExecutionContext
and useevalOn
to shift the work onto that EC