Designing Akka Supervisor Hierarchy

213 views Asked by At

Please note: I am a Java developer with no working knowledge of Scala (sadly). I would ask that any code examples provided in the answer would be using Akka's Java API.

I am brand-spanking-new to Akka and actors, and am trying to set up a fairly simple actor system:

enter image description here

So a DataSplitter actor runs and splits up a rather large chunk of binary data, say 20GB, into 100 KB chunks. For each chunk, the data is stored in the DataCache via the DataCacher. In the background, a DataCacheCleaner rummages through the cache and finds data chunks that it can safely delete. This is how we prevent the cache from becoming 20GB in size.

After sending the chunk off to the DataCacher for caching, the DataSplitter then notifies the ProcessorPool of the chunk which now needs to be processed. The ProcessorPool is a router/pool consisting of tens of thousands of different ProcessorActors. When each ProcessActor receives a notification to "process" a 100KB chunk of data, it then fetches the data from the DataCacher and does some processing on it.

If you're wondering why I am bothering even caching anything here (hence the DataCacher, DataCache and DataCacheCleaner), my thinking was that 100KB is still a fairly large message to pass around to tens of thousands of actor instances (100KB * 1,000 = 100MB), so I am trying to just store the 100KB chunk once (in a cache) and then let each actor access it by reference through the cache API.

There is also a Mailman actor that subscribes to the event bus and intercepts all DeadLetters.

So, altogether, 6 actors:

  • DataSplitter
  • DataCacher
  • DataCacheCleaner
  • ProcessorPool
  • ProcessorActor
  • Mailman

The Akka docs preach that you should decompose your actor system based on dividing up subtasks rather than purely by function, but I'm not exactly seeing how this applies here. The problem at hand is that I'm trying to organize a supervisor hierarchy between these actors and I'm not sure what the best/correct approach is. Obviously ProcessorPool is a router that needs to be the parent/supervisor to the ProcessorActors, so we have this known hierarchy:

/user/processorPool/
    processorActors

But other than that known/obvious relationship, I'm not sure how to organize the rest of my actors. I could make them all "peers" under one common/master actor:

/user/master/
    dataSplitter/
    dataCacher/
    dataCacheCleaner/
    processorPool/
        processorActors/
    mailman/

Or I could omit a master (root) actor and try to make things more vertical around the cache:

/user/
    dataSplitter/
    cacheSupervisor/
        dataCacher/
        dataCacheCleaner/
    processorPool/
        processorActors/
    mailman/

Being so new to Akka I'm just not sure what the best course of action is, and if someone could help with some initial hand-holding here, I'm sure the lightbulbs will all turn on. And, just as important as organizing this hierarchy is, I'm not even sure what API constructs I can use to actually create the hierarchy in the code.

1

There are 1 answers

0
biniam On

Organising them under one master makes it easier to manage since you can access all the actors watched by the supervisor (in this case master).

One hierarchical implementation can be:

Master Supervisor Actor

class MasterSupervisor extends UntypedActor {

private static SupervisorStrategy strategy = new AllForOneStrategy(2,
        Duration.create(5, TimeUnit.MINUTES),

        new Function<Throwable, Directive>() {
            @Override
            public Directive apply(Throwable t) {

                if (t instanceof SQLException) {
                    log.error("Error: SQLException")
                    return restart()
                } else if (t instanceof IllegalArgumentException) {
                    log.error("Error: IllegalArgumentException")
                    return stop()
                } else {
                    log.error("Error: GeneralException")
                    return stop()
                }
            }
        });

@Override
public SupervisorStrategy supervisorStrategy() { return strategy }

@Override
void onReceive(Object message) throws Exception {
     if (message.equals("SPLIT")) {
          // CREATE A CHILD OF MyOtherSupervisor
          if (!dataSplitter) {
              dataSplitter = context().actorOf(FromConfig.getInstance().props(Props.create(DataSplitter.class)), "DataSplitter")

              // WATCH THE CHILD
              context().watch(dataSplitter)

              log.info("${self().path()} has created, watching and sent JobId = ${message} message to DataSplitter")
          }

          // do something with message such as Forward
          dataSplitter.forward(message, context())
      }
}

DataSplitter Actor

class DataSplitter extends UntypedActor {

    // Inject a Service to do the main operation
    DataSplitterService dataSplitterService

    @Override
    void onReceive(Object message) throws Exception {
        if (message.equals("SPLIT")) {
            log.info("${self().path()} recieved message: ${message} from ${sender()}")
            // do something with message such as Forward
            dataSplitterService.splitData()
        }
    }
}