Good afternoon! I have a problem:
val rdd1: RDD[(key, value)] = ...
val rdd2: RDD[(key, othervalue)] = ...
I want to filter rdd1
and throw away all elements, which are not in rdd2
. I know two ways to do this.
First:
val keySet = rdd2.map(_.key).distinct().collect().toSet
rdd1.filter(x => keySet.contains(x))
It's not working because keySet
is large and don't fit into memory.
Another one:
rdd1.cogroup(rdd2)
.filter(x => x._2._2.nonEmpty)
.flatMap(x => x._2._1)
here something happens and I get two kinds of errors (in different places of code): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
and java.lang.ArrayIndexOutOfBoundException
I think that's because my groups are too large.
So how can I fix it? Is there any common method to fix such problem?
Have you considered using
subtractByKey
?something along the lines of