I have a RDD as follows :

[(((1, 2), 1.0), (2, 3)),
 (((1, 2), 1.0), (3, 4)),
 (((1, 2), 3.0), (2, 5)),
 (((2, 3), 1.0), (1, 2)),
 (((2, 3), 1.0), (3, 4)),
 (((2, 3), 'inf'), (2, 5)),
 (((3, 4), 1.0), (1, 2)),
 (((3, 4), 1.0), (2, 3)),
 (((3, 4), -1.0), (2, 5)),
 (((2, 5), 3.0), (1, 2)),
 (((2, 5), 'inf'), (2, 3)),
 (((2, 5), -1.0), (3, 4))]  

I want to filter out another RDD from this RDD which only contains those Key:value pairs which have common Keys. That is my output RDD should be :

[(((1,2),1.0), (2,3)),  
 ((1,2),1.0), (3,4)),  
 ((2,3),1.0), (1,2)),  
 ((2,3),1.0), (3,4)),  
 ((3,4),1.0), (1,2)),  
 ((3,4),1.0), (2,3)))]

I tried the following code-
rdd.countByKey() which returned me a dictionary containing

defaultdict(int,
            {((1, 2), 1.0): 2,
             ((1, 2), 3.0): 1,
             ((2, 3), 1.0): 2,
             ((2, 3), 'inf'): 1,
             ((3, 4), 1.0): 2,
             ((3, 4), -1.0): 1,
             ((2, 5), 3.0): 1,
             ((2, 5), 'inf'): 1,
             ((2, 5), -1.0): 1})

I thought of iterating on this dictionary as my next step so that I could filter out those items which has values > 1 but unable to succeed with the code. Can anyone help me out with solution to this problem?

1 Answers

0
ollik1 On

You can use groupByKey to get a sequence of values and then filter out sequences with less than two values

rdd.groupByKey().filter(lambda t: len(t[1]) >= 2).flatMapValues(lambda x: x).foreach(lambda x: print(x))

output:

(((1, 2), 1.0), (2, 3))
(((1, 2), 1.0), (3, 4))
(((2, 3), 1.0), (1, 2))
(((2, 3), 1.0), (3, 4))
(((3, 4), 1.0), (1, 2))
(((3, 4), 1.0), (2, 3))