flatMapping in scala/spark

1.3k views Asked by At

Looking for some assistance with a problem with how to to something in scala using spark.

I have:

type DistanceMap = HashMap[(VertexId,String), Int]

this forms part of my data in the form of an RDD of:

org.apache.spark.rdd.RDD[(DistanceMap, String)] 

in short my dataset looks like this:

 ({(101,S)=3},piece_of_data_1)
 ({(101,S)=3},piece_of_data_2)
 ({(101,S)=1, (100,9)=2},piece_of_data_3)

What I want to do us flat map my distance map (which I can do) but at the same time for each flatmapped DistanceMap want to retain the associated string with that. So my resulting data would look like this:

({(101,S)=3},piece_of_data_1))<br>
({(101,S)=3},piece_of_data_2))<br>
({(101,S)=1},piece_of_data_3))<br>
({(109,S)=2},piece_of_data_3))<br>

As mentioned I can flatMap the first part using:

x.flatMap(x=>x._1).collect.foreach(println))

but am stuck on how I can retain the string from the second part of my original data.

2

There are 2 answers

3
marios On BEST ANSWER

This might work for you:

x.flatMap(x => x._1.map(y => (y,x._2)))

The idea is to convert from (Seq(a,b,c),Value) to Seq( (a,Value), (b, Value), (c, Value)).

This is the same in Scala, so here is a standalone simplified Scala example you can paste in Scala REPL:

Seq((Seq("a","b","c"), 34), (Seq("r","t"), 2)).flatMap( x => x._1.map(y => (y,x._2)))

This results in:

res0: Seq[(String, Int)] = List((a,34), (b,34), (c,34), (r,2), (t,2))
0
vvladymyrov On

update

I have an alternative solution - flip key with value and use flatMapValues transformation, and then flip key with value again: see pseudo code:

x.map(x=>x._2, x._1).flatMapValues(x=>x).map(x=>x._2, x._1)

previous version

I propose to add one preprocessing step (sorry I have no computer with scala interpreter in front of me till tomorrow to come up with working code).

  1. transform the pair rdd from (DistanceMap, String) into the rdd with list of Tuple4: List((VertexId,String, Int, String), ... ())
  2. apply flatMap on on result

Pseudocode:

rdd.map( (DistanceMap, String) =>  List((VertexId,String, Int, String), ... ()))
    .flatMap(x=>x)