How to properly interrupt a fiber in zio test?

121 views Asked by At

I am trying to write some tests for a web service, that fork the service for a test, hit some endpoints, then kill it, but cannot figure out a way to do this that would actually work :(

Here is an "emulated" service I wrote just to demonstrate the problem. Instead of serving web requests, it just reads strings from input queue and copies them to the output. The Mgr, which is also a client, keeps references to both queues, and provides an ask method, that simulates a client-side endpoint call:

object EchoService {
  val nextId = new AtomicInteger(0)
  val manager = ZLayer.fromZIO(
    for {
      in <- Queue.bounded[String](10)
      out <- Queue.bounded[(Int, String)](10)
    } yield Mgr(in, out)
  )

  val forked = ZLayer.fromZIO(
    ZIO.acquireRelease(
      for {
        mgr <- ZIO.service[Mgr]
        f <- mgr.fork
      } yield f
    )(_.interrupt).withClock(ClockLive)
  )

  def ask(s: String) =
    ZIO.service[Mgr].flatMap(_.ask(s)).withClock(ClockLive)

  case class Mgr(in: Queue[String], out: Queue[(Int, String)]) {
    def ask(s: String) = in.offer(s) *> out.take.timeout(1.second)
    def fork = Svc(nextId.updateAndGet(_ + 1), in, out).serve.fork
  }

  case class Svc(id: Int, in: Queue[String], out: Queue[(Int, String)]) {
    val serve = ZStream.fromQueue(in)
      .takeWhile(_.nonEmpty)
      .timeout(5.seconds)
      .foreach(s => out.offer(id -> s))
      .withClock(ClockLive)
  }
}

And this is how I am trying to test it:

 def spec = suite("Tests")(
    suite("Forked service")(
      test("echo") { EchoService.ask("foo").map(x => assertTrue(x == Some(1, "foo"))).withClock(ClockLive) }
    ).provideSomeLayerShared(EchoService.manager >+> EchoService.forked)

This hangs forever :( (some debugging I have done indicates that .interrupt is invoked, but never returns).

The test completes successfully, and then it just hangs. There are two messages in the log, and I don't understand either of them:

1.

Warning: A test is using time, but is not advancing the test clock, which may result in the test hanging. Use TestClock.adjust to manually advance the time.

I don't get where this is using TestClock. I put .withClock(ClockLive) pretty much wherever I could. What else is missing? Is there a way to just not use TestClock for the whole spec instead of having to override it everywhere explicitly?

2.

Warning: ZIO Test is attempting to close the scope of suite Tests - Forked service in test_case_-665442250, but closing the scope has taken more than 60 seconds to complete. This may indicate a resource leak.

Ok, so the question is why this is happening. Does .interrupt not just kill the fiber, but behave more like java's interrupt, that justs politely asks the code to quit? If so, is there another method I can use to just stop executing a fiber unconditionally?

Interestingly, if instead of provideSomeLayerShared I use provideSomeLayer, the behavior changes: it still hangs but in a different way. The test does not complete (and doesn't seem to even start), and there is nothing at all in the log, except for this:

Test Tests - Forked service - echo has taken more than 1 m to execute. If this is not expected, consider using TestAspect.timeout to timeout runaway tests for faster diagnostics.

It looks like in this case it is trying to close the scope before even starting the test for some reason (and still hangs in the same place trying to call interrupt.

1

There are 1 answers

0
paulpdaniels On

This was an interesting problem, because it highlights several issues conspiring together.

To answer the first question related to using the live clock within the environment. For that you can use the TestAspect.withLiveClock either at a test level or at a suite level:

val spec = suite("MySpec")(
  // applies just to this test
  test("test1") { assertCompletes } @@ TestAspect.withLiveClock     
) @@ TestAspect.withLiveClock // applies to every test in the suite

The second issue that you are encountering relates to this layer (removed the extra clock assignment based on the above answer):

val forked = ZLayer.fromZIO(
  ZIO.acquireRelease(
    for {
      mgr <- ZIO.service[Mgr]
      f <- mgr.fork
    } yield f
  )(_.interrupt)
)

The problem here is one of both interruptibility and scoping. In ZIO all regions have an interruptibility state which indicates if that region can be actively interrupted, if it can't then the runtime will semantically block until the execution reaches a point that allows interruption.

By default fibers are always interruptible, however, in layers or during the acquisition or release phase of a a scope, the interruptibility is set to uninterruptible. Moreover, the interruptibility state is inherited by the child fibers meaning anything spawned within those regions will also have that behavior unless somehow overridden.

This means that

for {
  mgr <- ZIO.service[Mgr]
  f   <- mgr.fork 
} yield f

is executed uninterruptibly because it sits in an acquisition block and likewise the underlying stream is also executed uninterruptibly.

Before moving on there is one more issue here which is the Scope behavior. the ZIO.acquireRelease returns a ZIO[Scope with R, E, A], however, forks are tied to their parents scope by default. This means that by forking within the acquireRelease the fork is actually just tied to the acquisition, so you return from the for comprehension it is already trying to interrupt the fork (which it can't because it's uninterruptible). Also this is passing the scope of the ZIO outside of the layer which is an anti-pattern.

Instead i would suggest doing something like this:

val forked = ZLayer.scoped(ZIO.serviceWithZIO[Mgr](_.forkScoped))

The forkScoped operator returns a fiber whose lifetime is connected to the scope and then the ZLayer.scoped method connects that scope to the lifecycle of the Layer.

Fixing this still leaves the above problem of trying to interrupt a non-interruptible region. Handling this depends on what exactly we are doing within that region and where we can safely inject an interrupt.

I found the following two ways work:

  1. Add uninterruptible to the serve method .foreach(s => out.offer(id -> s)).interruptible, though you need to be careful with this approach as it can "poke holes" in the interruptibility of the rest of the ZIO.
  2. Add a finalizer to the Mgr layer that shuts down the queues, and allows the stream to naturally end when the layer is released.
val manager = ZLayer.scoped(
  for {
    in  <- Queue.bounded[String](10)
    out <- Queue.bounded[(Int, String)](10)
    _   <- ZIO.addFinalizer(in.shutdown *> out.shutdown)
  } yield Mgr(in, out)
)

Complete code sample:

object EchoSpec extends ZIOSpecDefault {
  val spec = suite("Echo")(
    test("mgr test") {
      EchoService.ask("hello").map(x => assertTrue(x.isDefined))
    }.provide(
      EchoService.manager,
      EchoService.forked
    ) @@ TestAspect.withLiveClock @@ TestAspect.timeout(10.seconds)
  )


  object EchoService {
    val nextId = new AtomicInteger(0)
    val manager = ZLayer.scoped(
      for {
        in  <- Queue.bounded[String](10)
        out <- Queue.bounded[(Int, String)](10)
        _   <- ZIO.addFinalizer(in.shutdown *> out.shutdown)
      } yield Mgr(in, out)
    )

    val forked = ZLayer.scoped(ZIO.serviceWithZIO[Mgr](_.fork)).unit

    def ask(s: String) =
      ZIO.serviceWithZIO[Mgr](_.ask(s))

    case class Mgr(in: Queue[String], out: Queue[(Int, String)]) {
      def ask(s: String) = in.offer(s) *> out.take.timeout(1.second)
      def fork           = Svc(nextId.updateAndGet(_ + 1), in, out).serve.forkScoped
    }

    case class Svc(id: Int, in: Queue[String], out: Queue[(Int, String)]) {
      val serve = ZStream
        .fromQueue(in)
        .takeWhile(_.nonEmpty)
        .timeout(5.seconds)
        .foreach(s => out.offer(id -> s))
    }
  }
}