Non-blocking updates of mutable state with Akka Actors

344 views Asked by At

EDIT: clarification of intent:

I have a (5-10 second) scala computation that aggregates some data from many AWS S3 objects at a given point in time. I want to make this information available through a REST API. I'd also like to update this information every minute or so for new objects that have been written to this bucket in the interim. The summary itself will be a large JSON blob, and can save a bunch of AWS calls if I cache the results of my S3 API calls from the previous updates (since these objects are immutable).

I'm currently writing this Spray.io based REST service in Scala. I'd like the REST server to continue serving 'stale' data even if a computation is currently taking place. Then once the computation is finished, I'd like to atomically start serving requests of the new data snapshot.

My initial idea was to have two actors, one doing the Spray routing and serving, and the other handling the long running computation and feeding the most recent cached result to the routing actor:

class MyCompute extends Actor {
  var myvar = 1.0 // will eventually be several megabytes of state
  import context.dispatcher
  // [ALTERNATIVE A]:
  // def compute() = this.synchronized { Thread.sleep(3000); myvar += 1.0 }
  // [ALTERNATIVE B]:
  // def compute() = { Thread.sleep(3000); this.synchronized { myvar += 1.0 }}
  def compute() = { Thread.sleep(3000); myvar += 1.0 }

  def receive = {
    case "compute" => {
      compute() // BAD: blocks this thread!
      // [FUTURE]:
      Future(compute()) // BAD: Not threadsafe
    }
    case "retrieve" => { 
      sender ! myvar 
      // [ALTERNATIVE C]:
      // sender ! this.synchronized { myvar }
    }
  }
}

class MyHttpService(val dataService:ActorRef) extends HttpServiceActor {

  implicit val timeout = Timeout(1 seconds)
  import context.dispatcher
  def receive = runRoute {
    path("ping") {
      get {
        complete {
          (dataService ? "retrieve").map(_.toString).mapTo[String]
        }
      }
    } ~
    path("compute") {
      post {
        complete {
          dataService ! "compute"
          "computing.."
        }
      }
    }
  }
}

object Boot extends App {
  implicit val system = ActorSystem("spray-sample-system")
  implicit val timeout = Timeout(1 seconds)

  val dataService = system.actorOf(Props[MyCompute], name="MyCompute")
  val httpService = system.actorOf(Props(classOf[MyHttpService], dataService), name="MyRouter")
  val cancellable = system.scheduler.schedule(0 milliseconds, 5000 milliseconds, dataService, "compute")

  IO(Http) ? Http.Bind(httpService, system.settings.config.getString("app.interface"), system.settings.config.getInt("app.port"))

}

As things are written, everything is safe, but when passed a "compute" message, the MyCompute actor will block the thread, and not be able to serve requests to the MyHttpService actor.

Some alternatives:

akka.agent

The akka.agent.Agent looks like it is designed to handle this problem nicely (replacing the MyCompute actor with an Agent), except that it seems to be designed for simpler updates of state:: In reality, MyCompute will have multiple bits of state (some of which are several megabyte datastructures), and using the sendOff functionality would seemingly rewrite all of that state every time which would seemingly apply a lot of GC pressure unnecessarily.

Synchronization

The [Future] code above solves the blocking problem, but as if I'm reading the Akka docs correctly, this would not be threadsafe. Would adding a synchronize block in [ALTERNATIVE A] solve this? I would also imagine that I only have to synchronize the actual update to the state in [ALTERNATIVE B] as well. I would seemingly also have to do the same for the reading of the state as in [ALTERNATIVE C] as well?

Spray-cache

The spray-cache pattern seems to be built with a web serving use case in mind (small cached objects available with a key), so I'm not sure if it applies here.

Futures with pipeTo

I've seen examples of wrapping a long running computation in a Future and then piping that back to the same actor with pipeTo to update internal state.

The problem with this is: what if I want to update the mutable internal state of my actor during the long running computation?

Does anyone have any thoughts or suggestions for this use case?

tl;dr:

I want my actor to update internal, mutable state during a long running computation without blocking. Ideas?

1

There are 1 answers

4
Anatoliy Kmetyuk On

So let the MyCompute actor create a Worker actor for each computation:

  1. A "compute" comes to MyCompute
  2. It remembers the sender and spawns the Worker actor. It stores the Worker and the Sender in Map[Worker, Sender]
  3. Worker does the computation. On finish, Worker sends the result to MyCompute
  4. MyCompute updates the result, retrieves the orderer of it from the Map[Worker, Sender] using the completed Worker as the key. Then it sends the result to the orderer, and then it terminates the Worker.

Whenever you have blocking in an Actor, you spawn a dedicated actor to handle it. Whenever you need to use another thread or Future in Actor, you spawn a dedicated actor. Whenever you need to abstract any complexity in Actor, you spawn another actor.