How to create a stream with Scalaz-Stream?

145 views Asked by At

It must be damn simple. But for some reason I cannot make it work.

  • If I do io.linesR(...), I have a stream of lines of the file, it's ok.
  • If I do Processor.emitAll(), I have a stream of pre-defined values. It also works.

But what I actually need is to produce values for scalaz-stream asynchronously (well, from Akka actor).

I have tried:

  • async.unboundedQueue[String]
  • async.signal[String]

Then called queue.enqueueOne(...).run or signal.set(...).run and listened to queue.dequeue or signal.discrete. Just with .map and .to. With an example proved to work with another kind of stream -- either with Processor or lines from the file.

What is the secret? What is the preferred way to create a channel to be streamed later? How to feed it with values from another context?

Thanks!

1

There are 1 answers

1
lmm On

If the values are produced asynchronously but in a way that can be driven from the stream, I've found it easiest to use the "primitive" await method and construct the process "by hand". You need an indirectly recursive function:

def processStep(v: Int): Process[Future, Int] =
  Process.emit(v) ++ Process.await(myActor ? NextValuePlease())(w => processStep(w))

But if you need a truly async process, driven from elsewhere, I've never done that.