akka timer giving message Cancel timer [Message] with generation [583]

351 views Asked by At

i am using akka timer

i am using twitter streams and i am trying to get the number of tweets in 5 second here is my code

case class PerSecond(tweet:Tweet)
case class TweetPerSecondCount(tweet:Tweet)

class TweetPerSecondActor extends Actor with Timers{
var counter=0
 def receive: PartialFunction[Any, Unit] = {
    case PerSecond(tweet) =>
      log.info("Actor TweetPerSecondActor recevied the message PerSecond")
      timers.startPeriodicTimer("perSecond", TweetPerSecondCount(tweet), 5.second)

    case TweetPerSecondCount(tweet) =>
      log.info("Actor TweetPerSecondActor recevied the message TweetPerSecondCount")
      log.info("got the tweet {}",getCounter+"in 5 seconds")

    case message =>
      log.warn("Actor TweetPerSecondActor: Unhandled message received : {}", message)
      unhandled(message)
  }
}

in play framework's controller Action i am taking the tweets objects from the twitter stream (continuous stream without stooping it)

    class Mycontroller extends Controller {

    val tweetPerSecondActor = system.actorOf......//create actor


    def tweetAveragePerSecond = Action {
        log.debug("in the action tweetAveragePerSecond")

        def getTweet: PartialFunction[StreamingMessage, Unit] = {
          case tweet: Tweet =>
            val future = ask(tweetPerSecondActor, PerSecond(tweet))
        }
        val streaming: Future[TwitterStream] = handleTwitterStreamClient.getStreamingCLient.sampleStatuses(stall_warnings = true)(getTweet)

        Ok("tweet average per second")
      }
    }

when i hit the route the logs show

TweetPerSecondActor INFO - Actor TweetPerSecondActor recevied the message PerSecond
16:42:44.335 28939 [ArteciateActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Cancel timer [perSecond] with generation [758]
16:42:44.335 28939 [ArteciateActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Start timer [perSecond] with generation [759]

    6:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-6] TweetPerSecondActor INFO - Actor TweetPerSecondActor recevied the message PerSecond
    16:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Cancel timer [perSecond] with generation [758]
    16:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Start timer [perSecond] with generation [759]

and if i pass the dummy string values instead of running the stream and passing the twitter object the timer works fine like below case class PerSecond(tweet:String) case class TweetPerSecondCount(tweet:String)

class TweetPerSecondActor extends Actor with Timers{
var counter=0
 def receive: PartialFunction[Any, Unit] = {
    case PerSecond(tweet) =>
      log.info("Actor TweetPerSecondActor recevied the message PerSecond")
      timers.startPeriodicTimer("perSecond", TweetPerSecondCount(tweet), 5.second)

    case TweetPerSecondCount(tweet) =>
      log.info("Actor TweetPerSecondActor recevied the message TweetPerSecondCount")
      log.info("got the tweet {}",getCounter+"in 5 seconds")

    case message =>
      log.warn("Actor TweetPerSecondActor: Unhandled message received : {}", message)
      unhandled(message)
  }
}

    class Mycontroller extends Controller {

    val tweetPerSecondActor = system.actorOf......//create actor


    def tweetAveragePerSecond = Action {
        log.debug("in the action tweetAveragePerSecond")

            val future = ask(tweetPerSecondActor, PerSecond("dummy value"))


        Ok("tweet average per second")
      }
    }
1

There are 1 answers

0
Levi Ramsey On

Timers in Akka are keyed (in your case the key is "perSecond") and the API guarantees that

Each timer has a key and if a new timer with same key is started the previous is cancelled

So every time the getTweet function (given that it's called for effect not for value, I prefer "procedure" but that's perhaps idiosyncratic) is called, the timer will get canceled.

Depending on what you're trying to accomplish solutions could include:

  • Actor per request. You'd need to have the request return something that can be passed to future requests to get information from the actor. Actor lifecycle management will also probably be desirable as well.
  • Keep one actor for the whole controller but use a unique timer key per request. If these are periodic timers, you'll have to track which timer keys are active and cancel ones which are no longer needed (keeping arbitrarily many periodic timers running is likely to eventually degrade performance)