Backpressure strategies for Akka Stream Source.queue not working

3.7k views Asked by At

I'm trying to understand why the below code snippet is doing what it's doing. I would have thought that because the Sink cannot produce demand faster than the Source is producing content, then I would be getting dropped messages in response to some of the offers (overflow strategy is set to Drop Buffer) and also an error and queue closed message after the self destruct piece.

The snippet:

package playground

import java.time.LocalDateTime
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream._
import akka.stream.scaladsl.{Sink, Source}

import scala.concurrent.duration._

case object MessageToSink

object Playground extends App {

  implicit val actorSystem = ActorSystem("Playground")
  implicit val execCntxt = actorSystem.dispatcher

  val sinkActor = actorSystem.actorOf(Props[Actor2SinkFwder])
  actorSystem.scheduler.schedule(1 millisecond, 50 milliseconds, sinkActor, MessageToSink)

  println(s"Playground has started... ${LocalDateTime.now()}")
}

class Actor2SinkFwder extends Actor with ActorLogging {

  implicit val materializer = ActorMaterializer()
  implicit val execCtxt = context.dispatcher

  val flow = Source.queue[Int](bufferSize = 1, overflowStrategy = OverflowStrategy.dropBuffer)
    .to(Sink.foreach[Int] {
      i =>
        println(s"$i Sinking starts at ${LocalDateTime.now()}")
        Thread.sleep(150)
        if (i == 5) throw new RuntimeException("KaBoom!")
        println(s"$i Sinking completes at ${LocalDateTime.now()}")
    }).run()

  val i: AtomicInteger = new AtomicInteger(0)

  override def receive: Receive = {
    case MessageToSink =>
      val num = i.incrementAndGet()
      println(s"$num Sink Command received at ${LocalDateTime.now()}")
      flow.offer(num).collect {
        case Enqueued => println(s"$num Enqueued ${LocalDateTime.now}")
        case Dropped => println(s"$num Dropped ${LocalDateTime.now}")
        case Failure(err) => println(s"$num Failed ${LocalDateTime.now} $err")
        case QueueClosed => println(s"$num Failed ${LocalDateTime.now} QueueClosed")
      }
   }
}

The Output:

Playground has started... 2016-12-27T18:35:29.574
1 Sink Command received at 2016-12-27T18:35:29.640
2 Sink Command received at 2016-12-27T18:35:29.642
3 Sink Command received at 2016-12-27T18:35:29.642
1 Sinking starts at 2016-12-27T18:35:29.649
1 Enqueued 2016-12-27T18:35:29.650
4 Sink Command received at 2016-12-27T18:35:29.688
5 Sink Command received at 2016-12-27T18:35:29.738
6 Sink Command received at 2016-12-27T18:35:29.788
1 Sinking completes at 2016-12-27T18:35:29.799
2 Sinking starts at 2016-12-27T18:35:29.800
2 Enqueued 2016-12-27T18:35:29.800
7 Sink Command received at 2016-12-27T18:35:29.838
8 Sink Command received at 2016-12-27T18:35:29.888
9 Sink Command received at 2016-12-27T18:35:29.938
2 Sinking completes at 2016-12-27T18:35:29.950
3 Sinking starts at 2016-12-27T18:35:29.951
3 Enqueued 2016-12-27T18:35:29.951
10 Sink Command received at 2016-12-27T18:35:29.988
11 Sink Command received at 2016-12-27T18:35:30.038
12 Sink Command received at 2016-12-27T18:35:30.088
3 Sinking completes at 2016-12-27T18:35:30.101
4 Sinking starts at 2016-12-27T18:35:30.101
4 Enqueued 2016-12-27T18:35:30.101
13 Sink Command received at 2016-12-27T18:35:30.138
14 Sink Command received at 2016-12-27T18:35:30.189
15 Sink Command received at 2016-12-27T18:35:30.238
4 Sinking completes at 2016-12-27T18:35:30.251
5 Sinking starts at 2016-12-27T18:35:30.251
5 Enqueued 2016-12-27T18:35:30.252
16 Sink Command received at 2016-12-27T18:35:30.288
17 Sink Command received at 2016-12-27T18:35:30.338
18 Sink Command received at 2016-12-27T18:35:30.388
19 Sink Command received at 2016-12-27T18:35:30.438
20 Sink Command received at 2016-12-27T18:35:30.488
21 Sink Command received at 2016-12-27T18:35:30.538
22 Sink Command received at 2016-12-27T18:35:30.588
23 Sink Command received at 2016-12-27T18:35:30.638
24 Sink Command received at 2016-12-27T18:35:30.688
25 Sink Command received at 2016-12-27T18:35:30.738
26 Sink Command received at 2016-12-27T18:35:30.788
etc...

I think my miss-understanding is around the use of getAsyncCallback in the QueueSource class. Even though the offer call in the QueueSource invokes the stageLogic with the correct Offer details, the actual handler for this code in the stage logic doesnt get invoked until the previous element has finished processing, so none of the logic for checking buffer sizes or applying Overflow Strategies is getting applied... :-/

2

There are 2 answers

2
Stefano Bonetti On BEST ANSWER

To see the result you're expecting, you should add an async stage between your Source and your Sink. This is a way to tell Akka to run the two stages using two distinct Actors - by forcing an asynchronous boundary between the two.

Without the async, Akka will optimize the execution by smashing everything in one actor, which will sequentialise the processing. In your example, as you noticed, a message is offered to the queue until the Thread.sleep(150) of the previous message have been completed. More info on the topic can be found here.

  val flow = Source.queue[Int](bufferSize = 1, overflowStrategy = OverflowStrategy.dropBuffer)
    .async
    .to(Sink.foreach[Int] {...}).run()

Also, you should add one more case when matching the .offer result. This is a Failure of the Future, which the Future gets completed with when the queue downstream has been failed. This applies to all messages offered after the first 5

  override def receive: Receive = {
    case MessageToSink =>
      val num = i.incrementAndGet()
      println(s"$num Sink Command received at ${LocalDateTime.now()}")
      flow.offer(num).onComplete {
        case Success(Enqueued) => println(s"$num Enqueued ${LocalDateTime.now}")
        case Success(Dropped) => println(s"$num Dropped ${LocalDateTime.now}")
        case Success(Failure(err)) => println(s"$num Failed ${LocalDateTime.now} $err")
        case Success(QueueClosed) => println(s"$num Failed ${LocalDateTime.now} QueueClosed")
        case util.Failure(err) => println(s"$num Failed ${LocalDateTime.now} with exception $err")
      }
  }

Note that, even by doing all the above, you will not see any QueueOfferResult.Dropped results. That is because you chose DropBuffer strategy. Every incoming message will be queued (therefore producing an Enqueued message), kicking out the existing buffer. If you change the strategy to DropNew, you should start seeing some Dropped messages.

0
gabrielgiussi On

I've found the answer to the problem I wrote in the comment and I think is very related to the original question so I want to add it like an answer (but the correct answer is the one from stefano).

The elements that were causing this behavior are buffers, but not the buffer that we have explicitly configured with for example map.(...).buffer(1,OverflowStrategy.dropBuffer).async, but internal buffers that are build on materialization. This buffers are exclusively implemented for performance and are part of the blueprint optimization that is performed on materialization.

While pipelining in general increases throughput, in practice there is a cost of passing an element through the asynchronous (and therefore thread crossing) boundary which is significant. To amortize this cost Akka Streams uses a windowed, batching backpressure strategy internally. It is windowed because as opposed to a Stop-And-Wait protocol multiple elements might be “in-flight” concurrently with requests for elements. It is also batching because a new element is not immediately requested once an element has been drained from the window-buffer but multiple elements are requested after multiple elements have been drained. This batching strategy reduces the communication cost of propagating the backpressure signal through the asynchronous boundary.

Is not by chance that the documentation about internal buffers is close to explicit buffers and are part of the "working with rate" section.

The BatchingActorInputBoundary has the inputBuffer.

  /* Bad: same number of emitted and consumed events, i.e. DOES NOT DROP
  Emitted: 1
  Emitted: 1
  Emitted: 1
  Consumed: 1
  Emitted: 1
  Emitted: 1
  Consumed: 1
  Consumed: 1
  Consumed: 1
  Consumed: 1
  */
  def example1() {
    val c = Source.tick(500 millis, 500 millis, 1)
      .map(x => {
        println("Emitted: " + x)
        x
      })
      .buffer(1, OverflowStrategy.dropBuffer).async
      .toMat(Sink.foreach[Int](x => {
        Thread.sleep(5000)
        println("Consumed: " + x)
      }))(Keep.left)
      .run
    Thread.sleep(3000)
    c.cancel()

}

The example above that were causing unexpected (for me!) behavior can be "solved" reducing the size of the internal buffer with

.toMat(Sink.foreach[Int](x => {
            Thread.sleep(5000)
            println("Consumed: " + x)
          }))
          (Keep.left).addAttributes(Attributes.inputBuffer(initial = 1, max = 1))

Now, some elements from upstream are discarded but there is a minimal input buffer of size 1, and we obtain the following output:

Emitted: 1
Emitted: 1
Emitted: 1
Emitted: 1
Emitted: 1
Consumed: 1
Consumed: 1
Consumed: 1

I hope this answer adds value to stefano's answer.

The akka team it's always one step ahead

In general, when time or rate driven processing stages exhibit strange behavior, one of the first solutions to try should be to decrease the input buffer of the affected elements to 1.

** UPDATE: **

Konrad Malawski considered this a racy solution and had recommend me to implement this behavior as a GraphStage. Here it is.

class LastElement[A] extends GraphStage[FlowShape[A,A]] {
    private val in = Inlet[A]("last-in")
    private val out = Outlet[A]("last-out")

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
      var pushPending: Option[A] = None

      override def preStart(): Unit = pull(in)

      def pushIfAvailable() = if (isAvailable(out)) {
        pushPending.foreach(p => {
          push(out, p)
          pushPending = None
        })
      }

      setHandler(out, new OutHandler {
        override def onPull(): Unit = pushIfAvailable
      })

      setHandler(in,new InHandler {
        override def onPush(): Unit = {
          pushPending = Some(grab(in))
          pushIfAvailable
          pull(in)
        }
      })

    }

    override def shape: FlowShape[A, A] = FlowShape(in,out)
  }