Akka Stream OnNext is not allowed

889 views Asked by At

I just followed the akka stream ActorPublisher example and sometime I got this message:

java.lang.IllegalStateException: onNext is not allowed when the stream has not requested elements, totalDemand was 0

looking at the docs, they explain:

You send elements to the stream by calling onNext. You are allowed to send as many elements as have been requested by the stream subscriber. This amount can be inquired with totalDemand. It is only allowed to use onNext when isActive and totalDemand>0, otherwise onNext will throw IllegalStateException.

When the stream subscriber requests more elements the ActorPublisherMessage.Request message is delivered to this actor, and you can act on that event. The totalDemand is updated automatically.

How can I prevent totalDemand to be zero? When I got this error, I lost the message I was trying to send.

Here is the example I've been following:

http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html

and this is my class test

object Test extends App {

  implicit val actorSystem = ActorSystem("ReactiveKafka")
  implicit val materializer = ActorFlowMaterializer()

  val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost = "localhost:2181")
  val publisher = kafka.consume("test", "groupName", new StringDecoder())

  val workerActor = actorSystem.actorOf(Props[Worker], name = "workerActor")

  Source(publisher).map(WorkerPool.Msg(_, workerActor)).runWith(Sink.actorSubscriber(WorkerPool.props))

}

Well, I received the message from kafka and I'm passing to the WorkerActor, however when sending like 10 messages/sec to Kafka, some of them are lost due this error.

UPDATE

I was facing the error described here (using the same library):

https://github.com/softwaremill/reactive-kafka/issues/11

I solved mine using a buffer, but looks like this PR will solve the problem.

https://github.com/softwaremill/reactive-kafka/pull/13

1

There are 1 answers

6
Ramón J Romero y Vigil On BEST ANSWER

If the down stream sink doesn't have any demand then your only options are

  1. tell the data source feeding Worker that there is no demand so that the source can stop producing messages until more demand comes in (the reactive solution).
  2. buffer the messages until you get some demand from the sink which could potentially fill up your buffer and you drop messages anyway.
  3. drop the messages when demand is 0 (which seems to be your current implementation).

But the whole point of "back-pressure" is to prevent onNext from being called when there is no demand.

To implement the buffering option above you can either buffer inside your Actor or outside:

  • Internal Buffer: look at the "ActorPublisher" example in the documentation for an example of buffering in an Actor that feeds an ActorPublisher.
  • External Buffer: Use an external buffer using either a buffered materializer or Flow.buffer in your stream.