I'm trying to interrupt an fs2 stream with SignalRef. I set up and run the stream with the following. The stream should run when switch
contains false
and should interrupt when switch
contains true
import cats.effect.IO
import fs2.Stream
import fs2.concurrent.SignallingRef
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
implicit val contextShift = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)
val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)
val program: Stream[IO, Unit] = {
val program: Stream[IO, Unit] =
Stream
.repeatEval(IO{
println(java.time.LocalTime.now)
println(switch.map(_.get).unsafeRunSync.unsafeRunSync)
})
.metered(1.second)
program
.interruptWhen(Stream.repeatEval(switch.map(_.get).unsafeRunSync))
}
program.compile.drain.unsafeRunAsync(() => _)
I then attempt to interrupt the stream with
switch.map(_.set(true).unsafeRunSync)
However, the stream keeps going. In stdout I see
15:58:33.048504
false
15:58:34.048760
false
15:58:35.049063
false
15:58:36.049356
false
15:58:37.049615
false
So apparently it's not picking up the switch to true?
There are several problems with your code.
First of all, please check the signature of
switch
:Type of
SignallingRef
is wrapped intoIO
. It means that the creation of newSignallingRef
is suspended untilIO
monad is evaluated (implicitly byIO
program flow or explicitly by callingunsafeRunXXX
). So more suitable name of this value would be probablycreateSwitch
.When you use
switch.map(_.get).unsafeRunSync
actually every time you're creating a new instance ofSignallingRef
with its default valuefalse
, so it's never get's evaluated to true.The rule of thumb is that you should (almost) never call
unsafeRunXXX
methods up until you finish assembling your IO/Stream program and then you should run that kind of method once.The right way to do it would be to create
switch
once in for-comprehension and then you could pass it toprogram
as an argument.I refactored your code a little bit to do what I think you intended to do and also added some clarification comments.