I don't quite understand the user-level design of ZIO's ZStream when writing to a queue:
val queue: RIO[Scope, Dequeue[Take[Nothing, Int]]] =
ZStream(1, 2).toQueue()
... emitting (for example) Take(Success(Chunk(1)))
I understand that, at a low level, something like this is necessary as a way of expressing completions & errors as data types. But this seems a bit cumbersome at the user level. Is there some way I'm supposed to be unwrapping these into plain elements?
It looks like Take#done
might be part of the puzzle:
def done[R](implicit trace: Trace): ZIO[R, Option[E], Chunk[A]] =
ZIO.done(exit)
But I feel like there's probably some utility lying around somewhere, which I'm seeing, which knows how to unwrap the resulting Option[E]
into an E
, and the Chunk[A]
so that each result-consumption operation can just deal with a single A
.
After a bit of research and experimentation I've learned some things. These might not be optimal (will leave this open for a while to see if others add helpful answers), but they seem okay for the moment.
One solution is to use
ZStream#toQueueOfElements()
, which instead yieldsDequeue[Exit[Option[Nothing], A]]
. This isn't quite like being able to do an.await
on aDequeue[A]
, but it's a little bit better -- for an element (a) you can do: