How to synchronize a bunch of effects in only one thread at a time

72 views Asked by At

i have a small problem.

      for
        map <- kafkaEventHoldLine.get // Ref[F, List[String]]
        key = dr.derived + dr.metricId.toString
        _ <- if !map.contains(key) then
          for
            _ <- addToHoldLine(key)
            process <- outlierProcessor.process(dr).timed
            (timeProcess, ol) = process
            _ <- info("Outlier.process", timeProcess.toMillis.named("duration"), dr.derived.named("sid"), dr.metricId.toString.named("mid"))
            append <- ol.traverse_(odt.append).timed
            (timeAppend, _) = append
            _ <- info("Outlier.append", timeAppend.toMillis.named("duration"), dr.derived.named("sid"), dr.metricId.toString.named("mid"))
            _ <- counters.processOutlier.observeTime(timeProcess + timeAppend)
            _ <- clearHoldLine(key)
          yield ()
        else
          Async[F].sleep(delayHoldLine.milliseconds) >> outlierProcess(dr)
      yield ()

this bunch of code is running concurrent. BUT i want to

      for
        map <- kafkaEventHoldLine.get // Ref[F, List[String]]
        key = dr.derived + dr.metricId.toString
        _ <- if !map.contains(key) then
          for
            _ <- addToHoldLine(key)

be synchronized between threads.

So i'm basically affraid of fetching outdated data from the ref in another thread, and i want to make all threads wait until working thread finishes addToHoldLine function, because after executing it, i mutate the state of a ref, making map.contains(key) return true

1

There are 1 answers

2
Daenyth On

It's not clear from your question whether this really needs to all be on one Thread or if it's enough for the code to ensure that only 1 runs at a time.

The tool for limiting concurrent work is Semaphore. Construct one in the factory for your class that holds the logic you want to limit, and then use permit around the specific work you want to limit.

object MyLogic:
  def create: IO[MyLogic] = Semaphore[IO](1).map(new MyLogic(_))

class MyLogic(sem: Semaphore[IO]):
  def doWork =
    sem.permit.surround {
      for
        map <- kafkaEventHoldLine.get
        // ...
    }

If you need to guarantee that all work happens on the same thread then construct a single-threaded ExecutionContext and use evalOn to shift the work onto that EC

object MyLogic:
  def create: Resource[IO, MyLogic] =
    Resource
      .make(IO(Executors.newSingleThreadExecutor))(es => IO(es.shutdown()))
      .map(ExecutionContext.fromExecutorService)
      .map(new MyLogic(_))

class MyLogic(singleThread: ExecutionContext):
  def doWork = {
    for
        map <- kafkaEventHoldLine.get
        //
    yield etc
  }.evalOn(singleThread)