Confused about stop condition on Spark/Graphx/Pregel example program to find 'path distance

790 views Asked by At

'

I am working my way through Graphx In Action and this book (source code for which is here: https://github.com/insidedctm/spark-graphx-in-action) discusses two ways of calculating the distance (number of edge hops) between the root of a tree and all of the nodes down to the leaves. I understand the code examples that are provided using aggregateMessages. In particular, the stop condition makes sense ( I have highlighted that condition via the comment which includes the text 'STOP CONDITION', below.) Once the attributes on the vertices of the graph stop changing, it no longer makes sense to keep running the algorithm.

I was a little puzzled when I looked at the Pregel way of calculating the same result (shown below.)

In particular when Pregel's apply method is called, maxIterations is the default, Integer.MAX_VALUE (which is for all practical purposes 'run forever'.) Therefore, it seems like the 'sendMsg' function which is:

               (et:EdgeTriplet[Int,String]) =>
                    Iterator((et.dstId, et.srcAttr+1)),

will be called infinitely, even after the values on the vertices have converged.

Is there some mechanism that I overlooked which causes the program to stop after convergence ?

// aggregateMessages approach
// from: https://github.com/insidedctm/spark-graphx-in-action/blob/51e4c667b927466bd02a0a027ca36625b010e0d6/Chapter04/Listing4_10IteratedFurthestVertex.scala

def sendMsg(ec: EdgeContext[Int,String,Int]): Unit = {
  ec.sendToDst(ec.srcAttr+1)
}

def mergeMsg(a: Int, b: Int): Int = {
  math.max(a,b)
}

def propagateEdgeCount(g:Graph[Int,String])
 :Graph[Int,String] = {    
  val verts = 
        g.aggregateMessages[Int](sendMsg, mergeMsg)
  val g2 = 
        Graph(verts, g.edges)
  val check = 
        g2.vertices.join(g.vertices).
           map(x => x._2._1 – x._2._2).
           reduce(_ + _)

  // STOP CONDITION
  // check here ensures stop if nothing changed  (******)
  if (check > 0)            
    propagateEdgeCount(g2)
  else
    g
}

// Pregel approach

val g = Pregel(myGraph.mapVertices((vid,vd) => 0), 0,
               activeDirection = EdgeDirection.Out)(
               (id:VertexId,vd:Int,a:Int) => math.max(vd,a),
               (et:EdgeTriplet[Int,String]) =>
                    Iterator((et.dstId, et.srcAttr+1)),
               (a:Int,b:Int) => math.max(a,b))
g.vertices.collect
1

There are 1 answers

2
lmnzh On

  As far as I know, if all the nodes halt, then pregel will stop working by itself.
  There are two ways to halt all nodes which can be achieved by all nodes' attributes unchanged any more :

  • 1.Give a condition to send message, in other words, if the given condition is false, nodes will stop sending message.

  • 2.Give a function that all node will halt after a number of iterations, that's to say,although sending message condition is still true, but all the nodes' attributes unchanged anymore.

    val bfs2 = initialGraph2.pregel(Double.PositiveInfinity)(
    (id, attr, msg) => math.min(attr, msg),
    triplet => {
    if (triplet.srcAttr != Double.PositiveInfinity  && triplet.dstAttr == 
    Double.PositiveInfinity) {Iterator((triplet.dstId, triplet.srcAttr+1))} 
    else {Iterator.empty}},
    (a,b) => math.min(a,b) ).cache()
    

    "triplet.dstAttr == Double.PositiveInfinity" is the continue condition.
     If all the nodes' less than Double.PositiveInfinity, sending message action will stop, apparently, all nodes will halt.