I have a connected graph Like this

user1|A,C,B
user2|A,E,B,A
user3|C,B,A,B,E
user4|A,C,B,E,B

where user are the property name and the path for that particular user is followed. For example for

user1 the  path is A->C->B
user2: A->E->B->A
user3: C->B->A->B->E
user4: A->C->B->E->B

Now, I want to find all users who have reached from A to E. Output should be user2, user3, user4(since all these users finally reached E from A, no matter how many hops they took). How can I write the motif for this. This is what I tried.

val vertices=spark.createDataFrame(List(("A","Billing"),("B","Devices"),("C","Payment"),("D","Data"),("E","Help"))).toDF("id","desc")

val edges = spark.createDataFrame(List(("A","C","user1"),
("C","B","user1"),
("A","E","user2"),
("E","B","user2"),
("B","A","user2"),
("C","B","user3"),
("B","A","user3"),
("A","B","user3"),
("B","E","user3"),
("A","C","user4"),
("C","B","user4"),
("B","E","user4"),
("E","B","user4"))).toDF("src","dst","user")

val pathAnalysis=GraphFrame(vertices,edges)
pathAnalysis.find("(a)-[]->();()-[]->();()-[]->(d)").filter("a.id='A'").filter("d.id='E'").distinct().show()

But I am getting an exception like this

org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans
Join Inner
:- Project [a#355]
:  +- Join Inner, (__tmp-4363599943432734077#353.src = a#355.id)
:     :- LocalRelation [__tmp-4363599943432734077#353]
:     +- Project [named_struct(id, _1#0, desc, _2#1) AS a#355]
:        +- Filter (named_struct(id, _1#0, desc, _2#1).id = A)
:           +- LocalRelation [_1#0, _2#1]
+- LocalRelation
and
LocalRelation [__tmp-1043886091038848698#371]
Join condition is missing or trivial.
Either: use the CROSS JOIN syntax to allow cartesian products between these
relations, or: enable implicit cartesian products by setting the configuration
variable spark.sql.crossJoin.enabled=true;

I am not sure if my condition is correct or how to set this property spark.sql.crossJoin.enabled=true on a spark-shell

I invoked my spark-shell as follows

spark-shell --packages graphframes:graphframes:0.3.0-spark2.0-s_2.11

2 Answers

1
Elior Malul On

my suggested solution is kinda trivial, but it will work fine if the paths are relatively short, and the number of user (i.e., number of rows in the data set) is big. If this is not the case, please let me know, other implementations are possible.

case class UserPath(
  userId: String,
  path: List[String])
val dsUsers = Seq(
  UserPath("user1", List("A", "B", "C")), 
  UserPath("user2", List("A", "E", "B", "A")))
.doDF.as[UserPath]

def pathExists(up: UserPath): Option[String] = {
  val prefix = up.path.takeWhile(s => s != "A")
  val len = up.path.length
  if (up.path.takeRight(len - prefix.length).contains("E"))
    Some(up.userId)
  else
    None
}
// Users with path from A -> E.
dsUsers.map(pathExists).filter(opt => !opt.isEmpty)
0
Artem Aliev On

You can also use BFS algorithm for it: http://graphframes.github.io/graphframes/docs/_site/api/scala/index.html#org.graphframes.lib.BFS With your data model you will have to iterate over users and run the BFS for each of them like this:

scala> pathAnalysis.bfs.fromExpr($"id" === "A").toExpr($"id" === "E").edgeFilter($"user" === "user3").run().show
+------------+-------------+------------+-------------+---------+
|        from|           e0|          v1|           e1|       to|
+------------+-------------+------------+-------------+---------+
|[A, Billing]|[A, B, user3]|[B, Devices]|[B, E, user3]|[E, Help]|
+------------+-------------+------------+-------------+---------+