Correct way to convert queue emitting Take[_, A] to A?

64 views Asked by At

I don't quite understand the user-level design of ZIO's ZStream when writing to a queue:

  val queue: RIO[Scope, Dequeue[Take[Nothing, Int]]] =
    ZStream(1, 2).toQueue()

... emitting (for example) Take(Success(Chunk(1)))

I understand that, at a low level, something like this is necessary as a way of expressing completions & errors as data types. But this seems a bit cumbersome at the user level. Is there some way I'm supposed to be unwrapping these into plain elements?

It looks like Take#done might be part of the puzzle:

  def done[R](implicit trace: Trace): ZIO[R, Option[E], Chunk[A]] =
    ZIO.done(exit)

But I feel like there's probably some utility lying around somewhere, which I'm seeing, which knows how to unwrap the resulting Option[E] into an E, and the Chunk[A] so that each result-consumption operation can just deal with a single A.

1

There are 1 answers

0
Tim W On

After a bit of research and experimentation I've learned some things. These might not be optimal (will leave this open for a while to see if others add helpful answers), but they seem okay for the moment.

One solution is to use ZStream#toQueueOfElements(), which instead yields Dequeue[Exit[Option[Nothing], A]]. This isn't quite like being able to do an .await on a Dequeue[A], but it's a little bit better -- for an element (a) you can do:

a match {
  case Exit.Success(value) => ???
  case Exit.Failure(cause) => ???
}