How to asynchronously interrupt an fs2 stream?

2.1k views Asked by At

I'm trying to interrupt an fs2 stream with SignalRef. I set up and run the stream with the following. The stream should run when switch contains false and should interrupt when switch contains true

  import cats.effect.IO
  import fs2.Stream
  import fs2.concurrent.SignallingRef

  import scala.concurrent.ExecutionContext
  import scala.concurrent.duration.DurationInt

  implicit val contextShift = IO.contextShift(ExecutionContext.global)
  implicit val timer = IO.timer(ExecutionContext.global)

  val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)

  val program: Stream[IO, Unit] = {

      val program: Stream[IO, Unit] =
        Stream
          .repeatEval(IO{
            println(java.time.LocalTime.now)
            println(switch.map(_.get).unsafeRunSync.unsafeRunSync)
          })
          .metered(1.second)

      program
        .interruptWhen(Stream.repeatEval(switch.map(_.get).unsafeRunSync))
    }
  
  program.compile.drain.unsafeRunAsync(() => _)

I then attempt to interrupt the stream with

switch.map(_.set(true).unsafeRunSync)

However, the stream keeps going. In stdout I see

15:58:33.048504
false
15:58:34.048760
false
15:58:35.049063
false
15:58:36.049356
false
15:58:37.049615
false

So apparently it's not picking up the switch to true?

2

There are 2 answers

0
Krzysztof Atłasik On

There are several problems with your code.

First of all, please check the signature of switch:

val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)

Type of SignallingRef is wrapped into IO. It means that the creation of new SignallingRef is suspended until IO monad is evaluated (implicitly by IO program flow or explicitly by calling unsafeRunXXX). So more suitable name of this value would be probably createSwitch.

When you use switch.map(_.get).unsafeRunSync actually every time you're creating a new instance of SignallingRef with its default value false, so it's never get's evaluated to true.

The rule of thumb is that you should (almost) never call unsafeRunXXX methods up until you finish assembling your IO/Stream program and then you should run that kind of method once.

The right way to do it would be to create switch once in for-comprehension and then you could pass it to program as an argument.

I refactored your code a little bit to do what I think you intended to do and also added some clarification comments.

import cats.effect.IO
import fs2.Stream
import fs2.concurrent.SignallingRef

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt

implicit val contextShift = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)

//I changed name to createSwitch, which I think reflect reality more
val createSwitch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)

val program: Stream[IO, Unit] = {

  //I pass here switch as method's param
  def program(switch: SignallingRef[IO, Boolean]): Stream[IO, Unit] =
    Stream
      .repeatEval {
        for { //I used for-comprehension to split IO into 3 statements
          switchValue <- switch.get //here I get value of switch
          _ <- IO(println(java.time.LocalTime.now)) //I split println into 2 separate statements
          _ <- IO(println(switchValue)) //because it's not a good practive to run 2 effect in single IO
        } yield ()
      }
      .metered(1.second)

  for {
    switch <- Stream.eval(createSwitch)
    //here I create effect to set switch to true after 10 seconds and then use start to run it
    //separate fiber in background. If I didn't do that it would just wait 10 sec and only then run program
    _ <- Stream.eval(switch.set(true).delayBy(10.seconds).start)
    _ <- program(switch).interruptWhen(switch)
  } yield ()

}

program.compile.drain.unsafeRunSync()
0
Mateusz Kubuszok On

Personally, I use my own kill switch for things like that:

final case class KillSwitch[F[_]](stream: Stream[F, Unit], switch: F[Unit])
object KillSwitch {

  def apply[F[_]: Sync]: F[KillSwitch[F]] =
    Ref.of(true).map(switch => KillSwitch(Stream.repeatEval(switch.get).takeWhile(identity).void, switch.set(false)))
}

I use it more or less like:

for {
  KillSwitch(kfStream, switch) <- KillSwitch[F]
  streamFiber <- yourStream.zip(kfStream).map(_._1).compile.drain.start
  // after a while
  _ <- switch // closes kfStream
  result <- streamFiber.join
} yield result

(let's say it's a pseudocode to show the idea).