Why my code takes so much time to execute in Spark Pregel?

466 views Asked by At

I have written code with Pregel in Spark which processes a graph, but it executes very very slowly for a small dataset. I have written programs with pregel before, but this code really works slow. my cluster consists of 2 workers. each have core i5 CPU and 6 GB of RAM. This is the code I have written in Pregel:

def run[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int) = {

      val temp_graph = graph.mapVertices { case (vid, _) => mutable.HashMap[VertexId, (Double,VertexId)](vid -> (1,vid)) }

      def sendMessage(e: EdgeTriplet[mutable.HashMap[VertexId, (Double,VertexId)], ED]): Iterator[(VertexId, mutable.HashMap[VertexId, List[(Double, VertexId)]])] = {
        val msg1 = e.dstAttr.map{ case (k,v) => (k, List(v)) }
        val msg2 = e.srcAttr.map{ case (k,v) => (k, List(v)) }
        Iterator((e.srcId, msg1), (e.dstId, msg2))
      }

      def mergeMessage(count1: (mutable.HashMap[VertexId, List[(Double,VertexId)]]), count2: (mutable.HashMap[VertexId, List[(Double,VertexId)]]))= {

        val communityMap = new mutable.HashMap[VertexId, List[(Double, VertexId)]]

        (count1.keySet ++ count2.keySet).map(key => {

          val count1Val: List[(Double, VertexId)] = count1.getOrElse(key,Nil)
          val count2Val: List[(Double, VertexId)] = count2.getOrElse(key,Nil)

          val pp = List(count1Val:::count2Val).flatten
          communityMap += key-> pp
        })
        communityMap
      }

      def vertexProgram(vid: VertexId, attr: mutable.HashMap[VertexId,(Double, VertexId)], message: mutable.HashMap[VertexId, List[(Double, VertexId)]]) = {

        if (message.isEmpty) attr
        else {

          val labels_score: mutable.HashMap[VertexId, Double] = message.map {

            key =>

              var value_sum = 0D
              var maxSimilar_result = 0D

              val max_similar = most_similar.filter(x => x._1 == vid).headOption match {
                case Some(x) => x._2 // most similar neighbor
                // case _ => -1
              }


              if (key._2.exists(x=>x._2==max_similar)) {
                maxSimilar_result = key._2.filter(v => v._2 == max_similar).headOption match {
                  case Some(v) => v._1 // is the most similar node in the List?
                  // case _ => 0D
                }
              }
              else maxSimilar_result = 0D

              key._2.map {
                values =>

                  value_sum += values._1 * (broadcastVariable.value(vid)(values._2)._2)

              }
              value_sum += (beta*value_sum)+((1-beta)*maxSimilar_result)

              (key._1,value_sum) //label list
          }


          val max_value = labels_score.maxBy(x=>x._2)._2.toDouble
          val dividedByMax: mutable.Map[VertexId, (Double, Double)] = labels_score.map(x=>(x._1,(x._2,x._2/max_value))) // divide by maximum value

          val resultMap: mutable.HashMap[VertexId, (Double, Double)] = new mutable.HashMap[VertexId,(Double, Double)]

          dividedByMax.map{ row => // select labels more than threshold P = 0.75
            if (row._2._1 >= p) resultMap += row
          }


          val xx = if (resultMap.isEmpty) dividedByMax.take(1).asInstanceOf[mutable.HashMap[VertexId, (Double, Double)]]
          else resultMap


          val rr = xx.map(x=>(x._1,x._2._1))
          val max_for_normalize= rr.values.sum

          val res: mutable.HashMap[VertexId, (Double, VertexId)] = rr.map(x=>(x._1->(x._2/max_for_normalize,vid))) // Normalize labels

          res
        }
      }


      val initialMessage = mutable.HashMap[VertexId, List[(Double,VertexId)]]()

      val overlapCommunitiesGraph = Pregel(temp_graph, initialMessage, maxIterations = maxSteps)(
        vprog = vertexProgram,
        sendMsg = sendMessage,
        mergeMsg = mergeMessage)

      overlapCommunitiesGraph
    }

Can anyone explain where is the problem for this slow execution?? Is it right that because I have 2 workers and there are a lot of message passing and reducing operations in Pregel, the performance of my code decreases a lot in big datasets??

0

There are 0 answers