Spray route get response from child actor

945 views Asked by At

I am trying to figure out how I can setup a Master Actor that calls the appropriate children, in support of some spray routes where I am trying to emulate db calls. I am new to akka / spray, so just trying to gain a better understanding of how you would properly setup spray -> actors -> db calls (etc.). I can get the response back from the top level actor, but when I try to get it back from one actor level below the parent I can't seem to get anything to work.

When looking at the paths of the actors, it appears that from the way I am making the call from my spray route that I am passing from a temp actor. Below is what I have so far for stubbing this out. This has to be just user error / ignorance, just not sure how to proceed. Any suggestions would be appreciated.

The Demo Spray Service and Redis Actor code snippets below show where I am calling the actor from my route and the multiple actors where I am having the issue (want my route to get response from SummaryActor). Thanks!

Boot:

object Boot extends App {

  // we need an ActorSystem to host our application in
  implicit val system = ActorSystem("on-spray-can")

  // create and start our service actor
  val service = system.actorOf(Props[DemoServiceActor], "demo-service")

  implicit val timeout = Timeout(5.seconds)
  // start a new HTTP server on port 8080 with our service actor as the handler
  IO(Http) ? Http.Bind(service, interface = "localhost", port = 8080)
}

Demo Service Actor (For Spray)

class DemoServiceActor extends Actor with Api {

  // the HttpService trait defines only one abstract member, which
  // connects the services environment to the enclosing actor or test
  def actorRefFactory = context

  // this actor only runs our route, but you could add
  // other things here, like request stream processing
  // or timeout handling
  def receive = handleTimeouts orElse runRoute(route)

  //Used to watch for request timeouts
  //http://spray.io/documentation/1.1.2/spray-routing/key-concepts/timeout-handling/
  def handleTimeouts: Receive = {
    case Timedout(x: HttpRequest) =>
      sender ! HttpResponse(StatusCodes.InternalServerError, "Too late")
  }


}

//Master trait for handling large APIs
//http://stackoverflow.com/questions/14653526/can-spray-io-routes-be-split-into-multiple-controllers
trait Api extends DemoService {
  val route = {
    messageApiRouting
  }
}

Demo Spray Service (Route):

trait DemoService extends HttpService with Actor  {
  implicit val timeout = Timeout(5 seconds) // needed for `?` below
  val redisActor = context.actorOf(Props[RedisActor], "redisactor")

  val messageApiRouting =
        path("summary" / Segment / Segment) { (dataset, timeslice) =>
          onComplete(getSummary(redisActor, dataset, timeslice)) {
            case Success(value) => complete(s"The result was $value")
            case Failure(ex) => complete(s"An error occurred: ${ex.getMessage}")
          }
        }

  def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = Future {

    val dbMessage = DbMessage("summary", dataset + timeslice)
    val future = redisActor ? dbMessage
    val result = Await.result(future, timeout.duration).asInstanceOf[String]
    result
  }

}

Redis Actor (Mock no actual redis client yet)

class RedisActor extends Actor with ActorLogging {
  //  val pool = REDIS
  implicit val timeout = Timeout(5 seconds) // needed for `?` below
  val summaryActor = context.actorOf(Props[SummaryActor], "summaryactor")


  def receive = {

    case msg: DbMessage => {
      msg.query match {
        case "summary" => {
          log.debug("Summary Query Request")
          log.debug(sender.path.toString)
           summaryActor ! msg
        }
      }
    }

    //If not match log an error
    case _ => log.error("Received unknown message: {} ")
  }
}

class SummaryActor extends Actor with ActorLogging{

  def receive = {
    case msg: DbMessage =>{
      log.debug("Summary Actor Received Message")
      //Send back to Spray Route

    }
  }
}
2

There are 2 answers

2
cmbaxter On BEST ANSWER

The first problem with your code is that you need to forward from the master actor to the child so that the sender is properly propagated and available for the child to respond to. So change this (in RedisActor):

summaryActor ! msg

To:

summaryActor forward msg

That's the primary issue. Fix that and your code should start working. There is something else that needs attention though. Your getSummary method is currently defined as:

def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = 
  Future {
    val dbMessage = DbMessage("summary", dataset + timeslice)
    val future = redisActor ? dbMessage
    val result = Await.result(future, timeout.duration).asInstanceOf[String]
    result
  }

The issue here is that the ask operation (?) already returns a Future, so there and you are blocking on it to get the result, wrapping that in another Future so that you can return a Future for onComplete to work with. You should be able to simplify things by using the Future returned from ask directly like so:

def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = {
  val dbMessage = DbMessage("summary", dataset + timeslice)
  (redisActor ? dbMessage).mapTo[String]      
}
0
Spyros Lalos On

Just an important comment on the above approaches.

Since the getSummary(...) function returns a Future[String] object and you call it in onComplete(...) function you need to import:

import ExecutionContext.Implicits.global

That way you will have ExecutionContext in scope by letting Future declare an implicit ExecutionContext parameter.

** If you don't, you will end up getting a mismatching error since onComplete(...) expects an onComplete Future magnet Object but you gave a Future[String] Object.