How to read messages from ZHub via ZStream?

303 views Asked by At

I am new to ZHub and ZStream and wanted to familiarize myself with their APIs.

Unfortnuately, I could not make this simple example work:

for
    hub <- Hub.bounded[String](4)
    stream = ZStream.fromHub(hub)
    _ <- hub.publish("Hello")
    _ <- hub.publish("World")
    collected <- stream.runCollect
    _ <- ZIO.foreach(collected) { msg => console.putStrLn(msg) }
yield
    ()

This program does not terminate, I suspect, because I am trying to collect an infinite stream. I have also tried to print the messages using stream.tap(...) or to shut down the hub. Nothing has helped.

What am I missing here? Any help is appreciated, thanks.

1

There are 1 answers

0
Lando-L On BEST ANSWER

@adamgfraser kindly provided a working example on GitHub:

import zio._
import zio.stream._

object Example extends App {

  def run(args: List[String]): URIO[ZEnv, ExitCode] =
    for {
      promise <- Promise.make[Nothing, Unit]
      hub     <- Hub.bounded[String](2)
      stream = ZStream.managed(hub.subscribe).flatMap { queue =>
                 ZStream.fromEffect(promise.succeed(())) *>
                   ZStream.fromQueue(queue)
               }
      fiber     <- stream.take(2).runCollect.fork
      _         <- promise.await
      _         <- hub.publish("Hello")
      _         <- hub.publish("World")
      collected <- fiber.join
      _         <- ZIO.foreach(collected)(console.putStrLn(_)).orDie
    } yield ExitCode.success
}

My mistake was to publish values to the hub before waiting for the subscription to complete.