Suppose in application office there are two EventSourcedBehavior actors
trait OfficeFridgeCommand
case object OpenFridge extends OfficeFridgeCommand
case object CloseFridge extends OfficeFridgeCommand
trait OfficeFridgeEvent
case object FridgeOpened extends OfficeFridgeEvent
case object FridgeClosed extends OfficeFridgeEvent
trait OfficeCoffeeMachineCommand
case object MakeCoffee extends OfficeCoffeeMachineCommand
trait OfficeCoffeeMachineEvent
case object CoffeeMade extends OfficeCoffeeMachineEvent
val fridgeEntity =
EventSourcedBehavior[OfficeFridgeCommand, OfficeFridgeEvent, OfficeFridge]()
val frontDoorEntity =
EventSourcedBehavior[OfficeFrontDoorCommand,, OfficeFrontDoorEvent, OfficeFridge]()
val coffeeMachineEntity =
EventSourcedBehavior[OfficeCoffeeMachineCommand, OfficeCoffeeMachineEvent, OfficeFridge]()
Suppose that there was some action with the fridge and there were 1000 events registered with the fridge with various Persistence Ids [0-1000].
Such that the journal like:
| ordering | persistence_id | event_ser_manifest |
|---|---|---|
| 1 | 200 | FridgeOpened |
| 2 | 200 | FridgeClosed |
| ... | ... | ... |
| 500 | 500 | FridgeOpened |
| ... | ... | ... |
| 1000 | 501 | FridgeClosed |
If there was a GetCoffeeMachineState message coming in with persistence id 500 to frontDoorEntity actor. The frontDoorEntity would attempt to replay the persistence_id = 500 journal events.
It would fail because it will not be able to cast OfficeFridgeEvent into the OfficeCoffeeMachineEvent actor (akka typed remember?).
Is this a common setup for this type of system. Or does every entity requires its own db that has an event journal with only "valid" type events that the actor accepts?
I am seeing this exact issue in my system right now. If someone (by accident) were to run 1000 of these queries I would have 1000 entity actors attempting to replay these events forever or until I restarted the pods.
What I end up having is infinite attempts to restart the entity actor with the following stack trace
at akka.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:200)
at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:98)
at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:73)
Caused by: java.lang.ClassCastException
which makes sense because a typed actor is attempting to process a different type events.
You said more than one entity (
fridge,front doorandcoffe machinein your example). Each entity replies to different commands and persist different events.When you create
EventSourcedBehaviorusing EventSourcedBehavior.apply()The first parameter is a PersistenceId. You are the one in charge to make that ID is unique. That object offers a factory method asking you for a hint and the entityId
As it is detailed in Event Sourcing - PersistenceId
You can take akka persistence shopping cart sample - ShoppingCart Behavior as a good example
Your code should be something like
Once you persist an event for the entity in the journal, you would be able to see something like the following in the database
Do I need to put everything in the same DB? Should I use a relational or non-relational DB?
As always, it depends. If you need persist millions of events per second and those events comes from millones of different devices, the answer will be NO. Akka persistence offers different persistence plugins
Each of them has its own pros and cons.
There are more plugins that you can find at scala index - akka persistence but they could be outdated, not having commercial support and else.