Spark: Cogroup RDDs fails in case of huge group

514 views Asked by At

Good afternoon! I have a problem:

val rdd1: RDD[(key, value)] = ...
val rdd2: RDD[(key, othervalue)] = ...

I want to filter rdd1 and throw away all elements, which are not in rdd2. I know two ways to do this.

First:

val keySet = rdd2.map(_.key).distinct().collect().toSet
rdd1.filter(x => keySet.contains(x))

It's not working because keySet is large and don't fit into memory.

Another one:

rdd1.cogroup(rdd2)
  .filter(x => x._2._2.nonEmpty)
  .flatMap(x => x._2._1)

here something happens and I get two kinds of errors (in different places of code): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE and java.lang.ArrayIndexOutOfBoundException

I think that's because my groups are too large.

So how can I fix it? Is there any common method to fix such problem?

2

There are 2 answers

0
rishi On

Have you considered using subtractByKey ?

something along the lines of

rdd1.map(x => (x, x))
    .subtractByKey(rdd2)
    .map((k,v) => k)
0
threecuptea On

Consider rdd1.subtractByKey( rdd1.subtractByKey(rdd2) ). rdd1.subtractByKey(rdd2) will get those elements with keys which are in rdd1 but not in rdd2. That's the opposite what you want. subtractByKey-ing those will get exactly what you want.