Is it possible to await for second response in Scala

270 views Asked by At

Let's say I have actor A. A expects to receive message and once it receives a message it sends two messages back.

A extends Actor {
  def receive: Receive = {
    case M1 =>
      context.sender ! M2
      context.sender ! M3
  }
}

And in actor A I want to send a message and then await for two responses. I know that's easy for one response in a way like

val task = A ? M1
Await.result(task, timeout)

but I'm not sure whether it is possible with two sequential messages.

It is important to send two separate messages as I need to Await only first of them in another place.

2

There are 2 answers

0
Akos Krivachy On BEST ANSWER

You can solve this problem by introducing an intermediate Actor in the cases when you do need to wait for both messages.

This actor would look something like this:

class AggregationActor(aActor: ActorRef) extends Actor {

  var awaitForM2: Option[M2] = None
  var awaitForM3: Option[M3] = None
  var originalSender: Option[ActorRef] = None

  def receive: Receive = {
    case M1 =>
      // We save the sender
      originalSender = Some(sender())
      // Proxy the message
      aActor ! M1
    case M2 =>
      awaitForM2 = Some(M2)
      checkIfBothMessagesHaveArrived()
    case M3 =>
      awaitForM3 = Some(M3)
      checkIfBothMessagesHaveArrived()
  }

  private def checkIfBothMessagesHaveArrived() = {
    for {
      m2 <- awaitForM2
      m3 <- awaitForM3
      s <- originalSender
    } {
      // Send as a tuple
      s ! (m2, m3)
      // Shutdown, our task is done
      context.stop(self)
    }
  }

}

Essentially it has internal state and keeps track of how M1 and M2 responses are arriving.

You could use this like:

def awaitBothMessages(input: M1, underlyingAActor: ActorRef, system: ActorSystem): Future[(M2, M3)] = {
  val aggregationActor = system.actorOf(Props(new AggregationActor(aActor)))
  (aggregationActor ? input).mapTo[(M2, M3)]
}

val system = ActorSystem("test")
val aActor = system.actorOf(Props(new A), name = "aActor")

// Awaiting the first message only:
val firstMessage = aActor ? M1
val first = Await.result(firstMessage, Duration.Inf)

// Awaiting both messages:
val bothMessages: Future[(M2, M3)] = awaitBothMessages(M1, aActor, system)
val both = Await.result(firstMessage, Duration.Inf)
1
Thiago Pereira On

How about return to the sender a tuple containing M2 and M3?

import akka.pattern.ask
import akka.actor.{Props, ActorSystem, Actor}
import akka.util.Timeout
import com.test.A.{M1, M2, M3}

import scala.concurrent.Await
import scala.concurrent.duration._

object Test extends App {

  implicit val timeout = Timeout(5 seconds)

  val system = ActorSystem("test-system")
  val actor = system.actorOf(Props[A], name = "a-actor")
  val future = actor ? M1
  val await = Await.result(future, Duration.Inf)
  println(await)

}

class A extends Actor {
  override def receive: Receive = {
    case M1 => sender() ! (M2, M3)
  }
}

object A {
  case object M1
  case object M2
  case object M3
}

Running this will result in:

(M2,M3)