I would like to create a Source
and later push elements on it, like in:
val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)
What is the recommended way to do this?
Thanks!
I would like to create a Source
and later push elements on it, like in:
val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)
What is the recommended way to do this?
Thanks!
Since Akka 2.5 Source
has a preMaterialize
method.
According to the documentation, this looks like the indicated way to do what you ask:
There are situations in which you require a
Source
materialized value before theSource
gets hooked up to the rest of the graph. This is particularly useful in the case of “materialized value powered” Sources, likeSource.queue
,Source.actorRef
orSource.maybe
.
Below an example on how this would be with a SourceQueue
. Elements are pushed to the queue before and after materialization, as well as from within the Flow
:
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, OverflowStrategy}
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
val sourceDecl = Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure)
val (sourceMat, source) = sourceDecl.preMaterialize()
// Adding element before actual materialization
sourceMat.offer("pre materialization element")
val flow = Flow[String].map { e =>
if(!e.contains("new")) {
// Adding elements from within the flow
sourceMat.offer("new element generated inside the flow")
}
s"Processing $e"
}
// Actually materializing with `run`
source.via(flow).to(Sink.foreach(println)).run()
// Adding element after materialization
sourceMat.offer("post materialization element")
Output:
Processing pre materialization element
Processing post materialization element
Processing new element generated inside the flow
Processing new element generated inside the flow
After playing around and looking for a good solution to this I came across this solution which is clean, simple, and works both pre and post materialization. https://stackoverflow.com/a/32553913/6791842
val (ref: ActorRef, publisher: Publisher[Int]) =
Source.actorRef[Int](bufferSize = 1000, OverflowStrategy.fail)
.toMat(Sink.asPublisher(true))(Keep.both).run()
ref ! 1 //before
val source = Source.fromPublisher(publisher)
ref ! 2 //before
Thread.sleep(1000)
ref ! 3 //before
source.runForeach(println)
ref ! 4 //after
Thread.sleep(1000)
ref ! 5 //after
Output:
1
2
3
4
5
There are three ways this can be achieved:
1. Post Materialization with SourceQueue
You can use
Source.queue
that materializes the Flow into aSourceQueue
:2. Post Materialization with Actor
There is a similar question and answer here, the gist being that you materialize the stream as an ActorRef and send messages to that ref:
3. Pre Materialization with Actor
Similarly, you could explicitly create an Actor that contains a message buffer, use that Actor to create a Source, and then send that Actor messages as described in the answer here: