Process messages at specific time reliably

211 views Asked by At

Let's assume I have a chat application.

Client send message to chat which resulted in some command to some Actor. Now, I want to process what he wrote immediately and make it available to other users in this chat, so I process this command. At the same time I want to tell myself (an actor) that I need to store this message in chat history database BUT not right now. Save to the database should happen every 2 minutes. And if there was a crash I should be able to persist to the database anyway.

I assume workflow be like this:

  1. User send a message
  2. Chat room actor received a command with this message
  3. We broadcast this message to everyone and add this message to some kind of queue to persist it to chat history database
  4. Some persist command runs when 2 minute timeout has passed. It collects all incoming chat messages which hasn't been persisted yet in the order of their arrival
  5. Run transaction with all messages and then remove them from the queue.
  6. If there was a crash somewhere after 3 and messages wasn't persisted, then I should try to persist them again. If they was persisted I should never try to persist them again.

How to construct something like this in Akka? Which features should I use/which patterns?

1

There are 1 answers

6
dk14 On BEST ANSWER

You may need two actors: one (coordinator) will send notifications about chat commands to clients. Another (throttler) - will push data to the database every 2 minutes. Your queue will be just an internal state of throttler:

class Coordinator extends Actor {
   def receive = {
     case command: Record => 
          broadcast(command)
          throttler ! command
   }
}


class Throttler extends Actor {

  import system.dispatcher

  val queue = mutable.List[Record] //it may be a cache instead

  def schedule = system.scheduler.scheduleOnce(2 minutes, self, Tick) // http://doc.akka.io/docs/akka/snapshot/scala/scheduler.html


  def receive = {
       case Start => schedule
       case command: Record =>
           queue ++= command
       case Tick => 
          schedule
          try {
            //---open transaction here---
            for (r <- queue) push(r)
            //---close transaction here---
            queue.clear //will not be cleared in case of exception
          } catch {...}
  }
}

You may also use FSM-based implementation as @abatyuk said.

If you need to decrease load to the mailboxes - you could try some backpressure/loadbalancing patterns like Akka Work Pulling.

If you want to protect node itself (to recover queue state if some of your server nodes fails) - you can use Akka Cluster to replicate (manually) queue's state. In that case coordinator should be Cluster Singleton and should send ticks by itself to random actor (you may use bus for this) and maintain their successes and failures as a supervisor. Note that supervisor state may be lost, so you should also replicate it through the nodes (and merge state between them every 2 minutes, so it's better to use SortedSet for queues, because merging will be something like sets.reduce(_ ++ _)).

Storages like Riak are already providing a simple way to solve clusterization problem, so you can use them as queues (and both coordinator and throttler will be "stateless" singletons). In case of Riak you may configure it as Available+Partitioning (see CAP-theorem) because merging data is not a problem here - your chat history is CRDT(conflict-free) data type.

Another solution is a cache with WriteBehind mode (configured to launch every 2 minutes) as the throttler.

Event sourcing could also protect your actor's state, but it's more useful when you need to redo all actions after restore (you don't need this - it will resubmit everything to the database). You can use snapshots (it's pretty much same as using cache directly), but it's better to save them to the cache (by implementing SnapshotStore) instead of local FS if you care about availability. Note, that you also may have to delete previously saved snapshots to reduce storage size. And you should synchronously save every record to avoid loosing state.

P.S. Don't forget to acknowledge a message to sender (to your javascript) or you could loose last messages (in mailboxes) even with cache as a queue.

P.S/2 Database is almost always a bad solution for actor's state persistence, because it's slow and may become unavailable. I wouldn't also recommend strong-consistent NoSQL solutions like MongoDB - eventual consistency is the best choice in your case.