Akka scala Event bus with different classifiers depending on the subscriber

371 views Asked by At

I am studying the Akka EventBus to check if it can solve one of my design problem and I still do not know. The problem is the following.

to simplify, I have:

case class Request(requesterId: String, operation: String, header:  RequestHeader)
case class Response(requesterId: String, operation: String, header: ResponseHeader)

I have several actors with different functions, and I want some actor to subscribe to Response depending on the requesterId, some others depending on the operation. Is there a way to achieve that easily, with EventBus and classifiers?

Thanks, Joel

2

There are 2 answers

2
Konrad 'ktoso' Malawski On

Sure, it's called LookupEventBus. You implement your own bus by extending it and extract the requesterId in the classify method, like this:

class LookupBusImpl extends EventBus with LookupClassification {
  type Event = HasRequesterId // I made up a super type for you here
  type Classifier = String
  type Subscriber = ActorRef

  override def classify(event: HasRequesterId): String = event.requesterId

Then you'd subscribe to a given requesterId, like so:

  lookupBus.subscribe(actorRef, "requester-100")

And this Actor will then receive only messages hich have been classified as requester-100.

0
cmbaxter On

I agree with Konrad that you should implement new LookupClassification buses to solve your problem. I think it's simplest to have two separate instances of these buses, one that classifies by requesterId and the other by operation. Some of the basic setup work for this approach would be:

//Singleton to hold the instances of each stream type
object ResponseEventStream{
  val RequestorIdStream = new RequestorIdResponseEventStream
  val OperationStream = new OperationResponseEventStream
}

//Common functionality for the two different types of streams
trait ResponseEventStream extends ActorEventBus with LookupClassification{
  import ResponseEventStream._
  type Event = Response
  type Classifier = String  
  protected def mapSize = 128
  protected def publish(resp:Response, subscriber: ActorRef) = {
    if (subscriber.isTerminated) unsubscribe(subscriber)
    else subscriber ! resp
  }  
}

//Concrete impl that uses requesterId to classify
class RequestorIdResponseEventStream extends ResponseEventStream{
  protected def classify(resp:Response) = resp.requesterId 
}

//Concrete impl that uses operation to classify
class OperationResponseEventStream extends ResponseEventStream{
  protected def classify(resp:Response) = resp.operation 
}

//Trait to mix into classes that need to publish or subscribe to response events
//Has helper methods to simplify interaction with the two distinct streams
trait ResponseEventing{
  import ResponseEventStream._

  def publishResponse(resp:Response){
    RequestorIdStream.publish(resp)
    OperationStream.publish(resp)
  }

  def subscribeByRequestId(requestId:String, ref:ActorRef){
    RequestorIdStream.subscribe(ref, requestId)
  }

  def subscribeByOperartion(op:String, ref:ActorRef){
    OperationStream.subscribe(ref, op)
  }  
}

Then you just need to mix that ResponseEventing trait into actors that need to either publish Response events or actors that need to subscribe to them. Actors that are publishing will call publishResponse and actors that need to subscribe will call subscribeXXX depending on which classification (requesterId or operation) they are interested in.