I have a use case that i have to process the request using akka fsm as soon as number of request reaches to specified value.
sealed trait State
case object Idle extends State
case object Active extends State
sealed trait Data
case object Uninitialized extends Data
case object QuickStart extends Data
case class A(a: Int) extends Data
class RequestHandlers extends FSM[State, Data] {
val queue = mutable.Queue[A]()
startWith(Idle, Uninitialized)
when(Idle) {
case Event(_, Uninitialized) =>
println("At Idle")
// self ! QuickStart
goto(Active) using QuickStart
}
when(Active) {
case Event(_, request: A) =>
println("At Active")
queue.take(2).map{x => println("request--- " + x.a + "processing")
queue.dequeue()
}
Thread.sleep(2000L)
goto(Active) using Uninitialized
}
whenUnhandled {
case Event(update: A, QuickStart) =>
queue += update
if(queue.size >= 2) {
println(s"At unhandled + ${update}" + "--" + queue)
goto(Active) using update
}
else {
println("size has not reached")
goto(Active) using Uninitialized
}
case Event(update: A, Uninitialized) =>
queue += update
println(s"At unhandled - Uninitialised + $update")
goto(Active) using QuickStart
}
initialize()
}
object demo extends App {
val actorSystem = ActorSystem("system")
val actor = actorSystem.actorOf(Props(classOf[RequestHandlers]))
val list = (1 to 10).toList
list.foreach { abc =>
actor ! Uninitialized
actor ! A(abc)
println("Sent")
}
}
I tried to using mutable queue where i am adding my request. After size of queue reaches to certain value i.e 2 process those requests simultaneously. After processing, i am dequeuing it. If i send 10 request, it will process 8 request but for last 2 it will never go to active state. I am not getting where i am making mistake while transition.
Any help will be appreciated!
I think that the minimal example of what you are doing looks like this:
and the test program is
The logic of
RequestHandlers
is that it accumulates incomming requests in a queue stored inside aStateData
object (which has only one type that is shared between both states). There are two statesWaiting
andActive
. The processing actually happens on the transitionWaiting
->Active
. Probably the most tricky point is to not forget that when FSM is in theActive
state, new messages will arrive and should be processed by adding to a queue (or rather starting a new queue with the data from that message).P.S. Well, this example is probably not that minimal. In fact you could have just one state and do the processing inside
if (newQueue.size == 2)
but that would be a quite strange FSM.