Are recursive computations with Apache Spark RDD possible?

5.6k views Asked by At

I'm developing chess engine using Scala and Apache Spark (and I need to stress that my sanity is not the topic of this question). My problem is that Negamax algorithm is recursive in its essence and when I try naive approach:

class NegaMaxSparc(@transient val sc: SparkContext) extends Serializable  {
  val movesOrdering = new Ordering[Tuple2[Move, Double]]() {
    override def compare(x: (Move, Double), y: (Move, Double)): Int =
      Ordering[Double].compare(x._2, y._2)
  }

  def negaMaxSparkHelper(game: Game, color: PieceColor, depth: Int, previousMovesPar: RDD[Move]): (Move, Double) = {
    val board = game.board

    if (depth == 0) {
      (null, NegaMax.evaluateDefault(game, color))
    } else {
      val moves = board.possibleMovesForColor(color)
      val movesPar = previousMovesPar.context.parallelize(moves)

      val moveMappingFunc = (m: Move) => { negaMaxSparkHelper(new Game(board.boardByMakingMove(m), color.oppositeColor, null), color.oppositeColor, depth - 1, movesPar) }
      val movesWithScorePar = movesPar.map(moveMappingFunc)
      val move = movesWithScorePar.min()(movesOrdering)

      (move._1, -move._2)
    }
  }

  def negaMaxSpark(game: Game, color: PieceColor, depth: Int): (Move, Double) = {
    if (depth == 0) {
      (null, NegaMax.evaluateDefault(game, color))
    } else {
      val movesPar = sc.parallelize(new Array[Move](0))

      negaMaxSparkHelper(game, color, depth, movesPar)
    }
  }
}

class NegaMaxSparkBot(val maxDepth: Int, sc: SparkContext) extends Bot {
  def nextMove(game: Game): Move = {
    val nms = new NegaMaxSparc(sc)
    nms.negaMaxSpark(game, game.colorToMove, maxDepth)._1
  }
}

I get:

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

The question is: can this algorithm be implemented recursively using Spark? If not, then what is the proper Spark-way to solve that problem?

2

There are 2 answers

0
lmm On

This is a limitation that makes sense in terms of the implementation, but it can be a pain to work with.

You can try pulling out the recursion to top level, just in the "driver" code that creates and operates with RDDs? Something like:

def step(rdd: Rdd[Move], limit: Int) =
  if(0 == limit) rdd
  else {
    val newRdd = rdd.flatMap(...)
    step(newRdd, limit - 1)
  }

Alternately it's always possible to translate recursion into iteration, by managing the "stack" explicitly by hand (although it may result in more cumbersome code).

0
Svend On

Only the driver can launch computation on RDD. The reason is that even though RDD "feel" like regular collections of data, behind the scene they are still distributed collections, so launching operations on them requires coordinating execution of tasks on all remote slaves, which spark hides from us most of the time.

So recursing from the slaves, i.e. launching new distributed tasks dynamically directly from slaves is not possible: only the drive can take care of such coordination.

Here's a possible alternative of a simplification of your problem (if I get things correctly). The idea is to successively build instances of Moves, each one representing the full sequence of Move from initial state.

Each instance of Moves is able to transform itself into a set of Moves, each one corresponding to the same sequence of Move plus one possible next Move.

From there the driver just has to successively flatMap the Moves for as deep as we want, and the resulting RDD[Moves] will execute all operations in parallel for us.

The downside of the approach is that all depth level are kept synchronized, i.e. we have to compute all moves at level n (i.e. the RDD[Moves] for level n) before going to the next one.

The code below is not tested, it probably has flaws and does not even compile, but hopefully it provides an idea on how to approach the problem.

/* one modification to the board */
case class Move(from: String, to: String)

case class PieceColor(color: String)

/* state of the game */ 
case class Board {

    // TODO
    def possibleMovesForColor(color: PieceColor): Seq[Move] = 
        Move("here", "there") :: Move("there", "over there") :: Move("there", "here") :: Nil

    // TODO: compute a new instance of board here, based on current + this move
    def update(move: Move): Board = new Board
}


/** Solution, i.e. a sequence of moves*/ 
case class Moves(moves: Seq[Move], game: Board, color: PieceColor) {    
    lazy val score = NegaMax.evaluateDefault(game, color)

    /** @return all valid next Moves  */
    def nextPossibleMoves: Seq[Moves] = 
        board.possibleMovesForColor(color).map { 
            nextMove => 
              play.copy(moves = nextMove :: play.moves, 
                        game = play.game.update(nextMove)
        } 

}

/** Driver code: negaMax: looks for the best next move from a give game state */
def negaMax(sc: SparkContext, game: Board, color: PieceColor, maxDepth: Int):Moves = {

    val initialSolution = Moves(Seq[moves].empty, game, color)

    val allPlays: rdd[Moves] = 
        (1 to maxDepth).foldLeft (sc.parallelize(Seq(initialSolution))) {
        rdd => rdd.flatMap(_.nextPossibleMoves)
    }

    allPlays.reduce { case (m1, m2) => if (m1.score < m2.score) m1 else m2}

}