I just followed the akka stream ActorPublisher example and sometime I got this message:
java.lang.IllegalStateException: onNext is not allowed when the stream has not requested elements, totalDemand was 0
looking at the docs, they explain:
You send elements to the stream by calling onNext. You are allowed to send as many elements as have been requested by the stream subscriber. This amount can be inquired with totalDemand. It is only allowed to use onNext when isActive and totalDemand>0, otherwise onNext will throw IllegalStateException.
When the stream subscriber requests more elements the ActorPublisherMessage.Request message is delivered to this actor, and you can act on that event. The totalDemand is updated automatically.
How can I prevent totalDemand to be zero? When I got this error, I lost the message I was trying to send.
Here is the example I've been following:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html
and this is my class test
object Test extends App {
implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorFlowMaterializer()
val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost = "localhost:2181")
val publisher = kafka.consume("test", "groupName", new StringDecoder())
val workerActor = actorSystem.actorOf(Props[Worker], name = "workerActor")
Source(publisher).map(WorkerPool.Msg(_, workerActor)).runWith(Sink.actorSubscriber(WorkerPool.props))
}
Well, I received the message from kafka and I'm passing to the WorkerActor, however when sending like 10 messages/sec to Kafka, some of them are lost due this error.
UPDATE
I was facing the error described here (using the same library):
https://github.com/softwaremill/reactive-kafka/issues/11
I solved mine using a buffer, but looks like this PR will solve the problem.
If the down stream sink doesn't have any demand then your only options are
Worker
that there is no demand so that the source can stop producing messages until more demand comes in (the reactive solution).But the whole point of "back-pressure" is to prevent onNext from being called when there is no demand.
To implement the buffering option above you can either buffer inside your Actor or outside:
Flow.buffer
in your stream.