I'm trying to interrupt an fs2 stream with SignalRef. I set up and run the stream with the following. The stream should run when switch contains false and should interrupt when switch contains true
import cats.effect.IO
import fs2.Stream
import fs2.concurrent.SignallingRef
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
implicit val contextShift = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)
val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)
val program: Stream[IO, Unit] = {
val program: Stream[IO, Unit] =
Stream
.repeatEval(IO{
println(java.time.LocalTime.now)
println(switch.map(_.get).unsafeRunSync.unsafeRunSync)
})
.metered(1.second)
program
.interruptWhen(Stream.repeatEval(switch.map(_.get).unsafeRunSync))
}
program.compile.drain.unsafeRunAsync(() => _)
I then attempt to interrupt the stream with
switch.map(_.set(true).unsafeRunSync)
However, the stream keeps going. In stdout I see
15:58:33.048504
false
15:58:34.048760
false
15:58:35.049063
false
15:58:36.049356
false
15:58:37.049615
false
So apparently it's not picking up the switch to true?
There are several problems with your code.
First of all, please check the signature of
switch:Type of
SignallingRefis wrapped intoIO. It means that the creation of newSignallingRefis suspended untilIOmonad is evaluated (implicitly byIOprogram flow or explicitly by callingunsafeRunXXX). So more suitable name of this value would be probablycreateSwitch.When you use
switch.map(_.get).unsafeRunSyncactually every time you're creating a new instance ofSignallingRefwith its default valuefalse, so it's never get's evaluated to true.The rule of thumb is that you should (almost) never call
unsafeRunXXXmethods up until you finish assembling your IO/Stream program and then you should run that kind of method once.The right way to do it would be to create
switchonce in for-comprehension and then you could pass it toprogramas an argument.I refactored your code a little bit to do what I think you intended to do and also added some clarification comments.