If I have a simple process which is emitting values of type String
and I wish to send these to multiple sinks (i.e. each sink gets sent the String
), how do I do this?
For example, running this program:
object Play extends App {
def prepend(s: String): String => String = s ++ _
val out1 = io.stdOutLines.map(prepend("1-") andThen _)
val out2 = io.stdOutLines.map(prepend("2-") andThen _)
val p = io.stdInLines to (out1 merge out2)
p.run.run
}
The output looks like:
a //input
1-a
b //input
2-b
c //input
2-c
d //input
1-d
I want the output to be this:
a //input
1-a
2-a
b //input
2-b
1-b
c //input
2-c
1-c
d //input
1-d
2-d
EDIT
I can achieve this as follows:
implicit class ToBoth[O](p: Process[Task, O]) {
def toBoth(s1: Sink[Task, O], s2: Sink[Task, O]): Process[Task, Unit] = {
(for (o <- p; n <- Process.emit(o) ++ Process.emit(o)) yield n) to (s1 interleave s2)
}
}
That is, I duplicate the input and interleave the output. This can be generalized:
def toAll(sinks: Sink[Task, O] *): Process[Task, Unit] = {
(for (o <- p; n <- Process.emitAll(sinks.map(_ => o))) yield n) to sinks.reduceLeftOption(_ interleave _).getOrElse(Process.empty)
}
EDIT 2
I just realized that the generalization toAll
does not work. toBoth
does, though
Is there a better (built-in) way?
You could also use
observe
andto
to attach multipleSink
s to aProcess
:observe
is liketo
but echoes what is passed into the sink. Soio.stdInLines.observe(out1)
still emits the strings that come from stdin (that means it is of typeProcess[Task, String]
) but also sends them to the sinkout1
.As Eric pointed out, it is also possible to
zip
Sink
s together. Here is a more elaborate example that sends individual lines of a logfile to different sinks depending on their log level:Running this will output: