It must be damn simple. But for some reason I cannot make it work.
- If I do
io.linesR(...)
, I have a stream of lines of the file, it's ok. - If I do
Processor.emitAll()
, I have a stream of pre-defined values. It also works.
But what I actually need is to produce values for scalaz-stream asynchronously (well, from Akka actor).
I have tried:
async.unboundedQueue[String]
async.signal[String]
Then called queue.enqueueOne(...).run
or signal.set(...).run
and listened to queue.dequeue
or signal.discrete
. Just with .map
and .to
. With an example proved to work with another kind of stream -- either with Processor or lines from the file.
What is the secret? What is the preferred way to create a channel to be streamed later? How to feed it with values from another context?
Thanks!
If the values are produced asynchronously but in a way that can be driven from the stream, I've found it easiest to use the "primitive"
await
method and construct the process "by hand". You need an indirectly recursive function:But if you need a truly async process, driven from elsewhere, I've never done that.