I have written code with Pregel in Spark which processes a graph, but it executes very very slowly for a small dataset. I have written programs with pregel before, but this code really works slow. my cluster consists of 2 workers. each have core i5 CPU and 6 GB of RAM. This is the code I have written in Pregel:
def run[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int) = {
val temp_graph = graph.mapVertices { case (vid, _) => mutable.HashMap[VertexId, (Double,VertexId)](vid -> (1,vid)) }
def sendMessage(e: EdgeTriplet[mutable.HashMap[VertexId, (Double,VertexId)], ED]): Iterator[(VertexId, mutable.HashMap[VertexId, List[(Double, VertexId)]])] = {
val msg1 = e.dstAttr.map{ case (k,v) => (k, List(v)) }
val msg2 = e.srcAttr.map{ case (k,v) => (k, List(v)) }
Iterator((e.srcId, msg1), (e.dstId, msg2))
}
def mergeMessage(count1: (mutable.HashMap[VertexId, List[(Double,VertexId)]]), count2: (mutable.HashMap[VertexId, List[(Double,VertexId)]]))= {
val communityMap = new mutable.HashMap[VertexId, List[(Double, VertexId)]]
(count1.keySet ++ count2.keySet).map(key => {
val count1Val: List[(Double, VertexId)] = count1.getOrElse(key,Nil)
val count2Val: List[(Double, VertexId)] = count2.getOrElse(key,Nil)
val pp = List(count1Val:::count2Val).flatten
communityMap += key-> pp
})
communityMap
}
def vertexProgram(vid: VertexId, attr: mutable.HashMap[VertexId,(Double, VertexId)], message: mutable.HashMap[VertexId, List[(Double, VertexId)]]) = {
if (message.isEmpty) attr
else {
val labels_score: mutable.HashMap[VertexId, Double] = message.map {
key =>
var value_sum = 0D
var maxSimilar_result = 0D
val max_similar = most_similar.filter(x => x._1 == vid).headOption match {
case Some(x) => x._2 // most similar neighbor
// case _ => -1
}
if (key._2.exists(x=>x._2==max_similar)) {
maxSimilar_result = key._2.filter(v => v._2 == max_similar).headOption match {
case Some(v) => v._1 // is the most similar node in the List?
// case _ => 0D
}
}
else maxSimilar_result = 0D
key._2.map {
values =>
value_sum += values._1 * (broadcastVariable.value(vid)(values._2)._2)
}
value_sum += (beta*value_sum)+((1-beta)*maxSimilar_result)
(key._1,value_sum) //label list
}
val max_value = labels_score.maxBy(x=>x._2)._2.toDouble
val dividedByMax: mutable.Map[VertexId, (Double, Double)] = labels_score.map(x=>(x._1,(x._2,x._2/max_value))) // divide by maximum value
val resultMap: mutable.HashMap[VertexId, (Double, Double)] = new mutable.HashMap[VertexId,(Double, Double)]
dividedByMax.map{ row => // select labels more than threshold P = 0.75
if (row._2._1 >= p) resultMap += row
}
val xx = if (resultMap.isEmpty) dividedByMax.take(1).asInstanceOf[mutable.HashMap[VertexId, (Double, Double)]]
else resultMap
val rr = xx.map(x=>(x._1,x._2._1))
val max_for_normalize= rr.values.sum
val res: mutable.HashMap[VertexId, (Double, VertexId)] = rr.map(x=>(x._1->(x._2/max_for_normalize,vid))) // Normalize labels
res
}
}
val initialMessage = mutable.HashMap[VertexId, List[(Double,VertexId)]]()
val overlapCommunitiesGraph = Pregel(temp_graph, initialMessage, maxIterations = maxSteps)(
vprog = vertexProgram,
sendMsg = sendMessage,
mergeMsg = mergeMessage)
overlapCommunitiesGraph
}
Can anyone explain where is the problem for this slow execution?? Is it right that because I have 2 workers and there are a lot of message passing and reducing operations in Pregel, the performance of my code decreases a lot in big datasets??