How to make recursive calls within Behaviors.receive?

433 views Asked by At

This code is from akka documentation. It impelements an actor using the recommended functional style:

import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors

object Counter {
  sealed trait Command
  case object Increment extends Command
  final case class GetValue(replyTo: ActorRef[Value]) extends Command
  final case class Value(n: Int)

  def apply(): Behavior[Command] =
    counter(0)

  private def counter(n: Int): Behavior[Command] =
    Behaviors.receive { (context, message) =>
      message match {
        case Increment =>
          val newValue = n + 1
          context.log.debug("Incremented counter to [{}]", newValue)
          counter(newValue)
        case GetValue(replyTo) =>
          replyTo ! Value(n)
          Behaviors.same
      }
    }
}

The actor contains a recursive call "counter(newValue)" to maintain mutable state by functional means. When I implement this and add the @tailrec annotation to the function, the scala compiler complains as the call is not tail recursive, even it seems to be in the last position. This means, sooner or later a stack overflow exception will occur (imagine you just want to count all incoming messages and there are some billions of them - no java stack would be big enough).

Is it possible to make the call tail recursive or do I have to fallback to the object oriented style with mutable variables to handle those cases?

1

There are 1 answers

0
Levi Ramsey On BEST ANSWER

The short answer is that it's not recursive, because what counter is doing, ultimately, boils down to:

  • It creates an instance of a Function2[ActorContext[Command], Command, Behavior[Command]]
  • It passes that instance to Behaviors.receive, which uses it to construct a Behaviors.Receive[Command] object (which extends Behavior[Command])

To elaborate:

While this isn't the exact transformation performed by any recent Scala compiler, this should give you the flavor of why it's not recursive

object Counter {
  // Protocol omitted
  class CounterFunction(n: Int) extends Function2[ActorContext[Command], Command, Behavior[Command]] {
    override def apply(context: ActorContext[Command], message: Command): Behavior[Command] =
      message match {
        case Increment =>
          // omitting logging, etc.
          counter(n + 1)
        case GetValue(replyTo) =>
          replyTo ! Value(n)
          Behaviors.same
      }
  }

  private def counter(n: Int): Behavior[Command] = {
    val f = new CounterFunction(n)
    Behaviors.receive(f)
  }
}

Note that since the call to counter is wrapped inside CounterFunction's apply method, they don't happen until that apply is called, which isn't until a message is actually being processed.

This will not overflow the stack, as can be seen with this minimal implementation of something that's not that different from the implementation deep within Akka's internals:

case class Behavior[T](
  processor: (ActorContext[T], T) => Behavior[T]
)

object Behavior {
  def processMsgs[T](b: Behavior[T], ctx: ActorContext[T])(msgs: List[T]): Behavior[T] =
    // No recursion here...
    msgs.foldLeft(b) { (behavior, m) => behavior.processor(ctx, m) }
}

The Behavior.processMsgs function is an example of what's known (especially in the functional programming language implementation community), as a trampoline:

a loop that iteratively invokes [functions which return unevaluated function objects].... Programmers can use trampolined functions to implement tail-recursive function calls in stack-oriented programming languages.

In this particular case, the "unevaluated function object" is the processor in this sample implementation of Behavior.