I am trying to implement the algorithm from Rocha & Thatte (http://cdsid.org.br/sbpo2015/wp-content/uploads/2015/08/142825.pdf) with Pyspark and the pregel wraper from graphframes. Here I am getting stuck with the correct syntax for the message aggregation.
The idea is strait forward:
...In each pass, each active vertex of G sends a set of sequences of vertices to its out- neighbours as described next. In the first pass, each vertex v sends the message (v) to all its out- neighbours. In subsequent iterations, each active vertex v appends v to each sequence it received in the previous iteration. It then sends all the updated sequences to its out-neighbours. If v has not received any message in the previous iteration, then v deactivates itself. The algorithm terminates when all the vertices have been deactivated. ...
My idea is to send the vertices ids to the destination vertices (dst) and in the aggregation function collect them into a list. Then in my vertex column "sequence" I would like to append/merge this new list items with the existing one and then do a check with when statements if the current vertex id is already in the sequence. Then I could set the vertex according vertex columns to true to flag them as in a cycle. But I can't find the correct syntax in Spark on how to concatenate this. Does anyone has an idea? Or implemented something similar?
My current code
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as f
from pyspark.sql.functions import coalesce, col, lit, sum, when
from graphframes import GraphFrame
from graphframes.lib import *
SimpleCycle=[
("1","2"),
("2","3"),
("3","4"),
("4","5"),
("5","2"),
("5","6")
]
edges = sqlContext.createDataFrame(SimpleCycle,["src","dst"]) \
.withColumn("self_loop",when(col("src")==col("dst"),True).otherwise(False))
edges.show()
+---+---+---------+
|src|dst|self_loop|
+---+---+---------+
| 1| 2| false|
| 2| 3| false|
| 3| 4| false|
| 4| 5| false|
| 5| 2| false|
| 5| 6| false|
+---+---+---------+
vertices=edges.select("src").union(edges.select("dst")).distinct().distinct().withColumnRenamed('src', 'id')
#vertices = spark.createDataFrame([[1], [2], [3], [4],[5],[6],[7],[8],[9]], ["id"])
#vertices.sort("id").show()
graph = GraphFrame(vertices, edges)
cycles=graph.pregel \
.setMaxIter(5) \
.withVertexColumn("is_cycle", lit(""),lit("logic to be added")) \
.withVertexColumn("sequence", lit(""),Pregel.msg()) \
.sendMsgToDst(Pregel.src("id")) \
.aggMsgs(f.collect_list(Pregel.msg())) \
.run()
cycles.show()
+---+-----------------+--------+
| id| is_cycle|sequence|
+---+-----------------+--------+
| 3|logic to be added| [2]|
| 5|logic to be added| [4]|
| 6|logic to be added| [5]|
| 1|logic to be added| null|
| 4|logic to be added| [3]|
| 2|logic to be added| [5, 1]|
+---+-----------------+--------+
Code that does not work but what I think the logic should be
cycles=graph.pregel \
.setMaxIter(5) \
.withVertexColumn("is_cycle", lit(""), \
when(Pregel.src("id").isin(Pregel.src(sequence)),True).otherwise(False) \
.withVertexColumn("sequence", lit("null"),Append_To_Existing_List(Pregel.msg()) \
.sendMsgToDst(
when(Pregel.src("sequence").isNull(),Pregel.src("id")) \
.otherwise(Pregel.src("sequence")) \
.aggMsgs(f.collect_list(Pregel.msg())) \
.run()
# I would like to have a result like
+---+-----------------+---------+
| id| is_cycle|sequence |
+---+-----------------+---------+
| 1|false | [1] |
| 2|true |[2,3,4,5]|
| 3|true |[2,3,4,5]|
| 4|true |[2,3,4,5]|
| 5|true |[2,3,4,5]|
| 6|false | null |
+---+-----------------+---------+
Finally I implemented Rocha-Thatte algorithm not via pregel but with the underlying message aggregation function of graphframe/graphX. In case someone is interested I'd like to share the solution
This solution works correctly and can handle very large graphs without failing However it is getting quite slow if the cycle length or the graph is long. Not sure how to improve this right now. Possibly in using checkpoints or broadcasting in a smart way
Happy about any input for improvement
this functions takes a graphs like
SimpleCycle:
NestedCycle: