I'm trying to understand why the below code snippet is doing what it's doing. I would have thought that because the Sink cannot produce demand faster than the Source is producing content, then I would be getting dropped messages in response to some of the offers (overflow strategy is set to Drop Buffer) and also an error and queue closed message after the self destruct piece.
The snippet:
package playground
import java.time.LocalDateTime
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream._
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
case object MessageToSink
object Playground extends App {
implicit val actorSystem = ActorSystem("Playground")
implicit val execCntxt = actorSystem.dispatcher
val sinkActor = actorSystem.actorOf(Props[Actor2SinkFwder])
actorSystem.scheduler.schedule(1 millisecond, 50 milliseconds, sinkActor, MessageToSink)
println(s"Playground has started... ${LocalDateTime.now()}")
}
class Actor2SinkFwder extends Actor with ActorLogging {
implicit val materializer = ActorMaterializer()
implicit val execCtxt = context.dispatcher
val flow = Source.queue[Int](bufferSize = 1, overflowStrategy = OverflowStrategy.dropBuffer)
.to(Sink.foreach[Int] {
i =>
println(s"$i Sinking starts at ${LocalDateTime.now()}")
Thread.sleep(150)
if (i == 5) throw new RuntimeException("KaBoom!")
println(s"$i Sinking completes at ${LocalDateTime.now()}")
}).run()
val i: AtomicInteger = new AtomicInteger(0)
override def receive: Receive = {
case MessageToSink =>
val num = i.incrementAndGet()
println(s"$num Sink Command received at ${LocalDateTime.now()}")
flow.offer(num).collect {
case Enqueued => println(s"$num Enqueued ${LocalDateTime.now}")
case Dropped => println(s"$num Dropped ${LocalDateTime.now}")
case Failure(err) => println(s"$num Failed ${LocalDateTime.now} $err")
case QueueClosed => println(s"$num Failed ${LocalDateTime.now} QueueClosed")
}
}
}
The Output:
Playground has started... 2016-12-27T18:35:29.574
1 Sink Command received at 2016-12-27T18:35:29.640
2 Sink Command received at 2016-12-27T18:35:29.642
3 Sink Command received at 2016-12-27T18:35:29.642
1 Sinking starts at 2016-12-27T18:35:29.649
1 Enqueued 2016-12-27T18:35:29.650
4 Sink Command received at 2016-12-27T18:35:29.688
5 Sink Command received at 2016-12-27T18:35:29.738
6 Sink Command received at 2016-12-27T18:35:29.788
1 Sinking completes at 2016-12-27T18:35:29.799
2 Sinking starts at 2016-12-27T18:35:29.800
2 Enqueued 2016-12-27T18:35:29.800
7 Sink Command received at 2016-12-27T18:35:29.838
8 Sink Command received at 2016-12-27T18:35:29.888
9 Sink Command received at 2016-12-27T18:35:29.938
2 Sinking completes at 2016-12-27T18:35:29.950
3 Sinking starts at 2016-12-27T18:35:29.951
3 Enqueued 2016-12-27T18:35:29.951
10 Sink Command received at 2016-12-27T18:35:29.988
11 Sink Command received at 2016-12-27T18:35:30.038
12 Sink Command received at 2016-12-27T18:35:30.088
3 Sinking completes at 2016-12-27T18:35:30.101
4 Sinking starts at 2016-12-27T18:35:30.101
4 Enqueued 2016-12-27T18:35:30.101
13 Sink Command received at 2016-12-27T18:35:30.138
14 Sink Command received at 2016-12-27T18:35:30.189
15 Sink Command received at 2016-12-27T18:35:30.238
4 Sinking completes at 2016-12-27T18:35:30.251
5 Sinking starts at 2016-12-27T18:35:30.251
5 Enqueued 2016-12-27T18:35:30.252
16 Sink Command received at 2016-12-27T18:35:30.288
17 Sink Command received at 2016-12-27T18:35:30.338
18 Sink Command received at 2016-12-27T18:35:30.388
19 Sink Command received at 2016-12-27T18:35:30.438
20 Sink Command received at 2016-12-27T18:35:30.488
21 Sink Command received at 2016-12-27T18:35:30.538
22 Sink Command received at 2016-12-27T18:35:30.588
23 Sink Command received at 2016-12-27T18:35:30.638
24 Sink Command received at 2016-12-27T18:35:30.688
25 Sink Command received at 2016-12-27T18:35:30.738
26 Sink Command received at 2016-12-27T18:35:30.788
etc...
I think my miss-understanding is around the use of getAsyncCallback in the QueueSource class. Even though the offer call in the QueueSource invokes the stageLogic with the correct Offer details, the actual handler for this code in the stage logic doesnt get invoked until the previous element has finished processing, so none of the logic for checking buffer sizes or applying Overflow Strategies is getting applied... :-/
To see the result you're expecting, you should add an
async
stage between yourSource
and yourSink
. This is a way to tell Akka to run the two stages using two distinct Actors - by forcing an asynchronous boundary between the two.Without the
async
, Akka will optimize the execution by smashing everything in one actor, which will sequentialise the processing. In your example, as you noticed, a message isoffer
ed to the queue until theThread.sleep(150)
of the previous message have been completed. More info on the topic can be found here.Also, you should add one more case when matching the
.offer
result. This is aFailure
of theFuture
, which theFuture
gets completed with when the queue downstream has been failed. This applies to all messagesoffer
ed after the first 5Note that, even by doing all the above, you will not see any
QueueOfferResult.Dropped
results. That is because you choseDropBuffer
strategy. Every incoming message will be queued (therefore producing anEnqueued
message), kicking out the existing buffer. If you change the strategy toDropNew
, you should start seeing someDropped
messages.