Convert JavaPairRDD to Dataframe in Spark Java API

7.4k views Asked by At

I am using Spark 1.6 with Java 7

I have a pair RDD:

JavaPairRDD<String, String> filesRDD = sc.wholeTextFiles(args[0]);

I want to convert it into DataFrame with schema.

It seems that first I have to convert pairRDD to RowRDD.

So how to create RowRdd from PairRDD ?

2

There are 2 answers

0
abaghel On BEST ANSWER

For Java 7 you need to define a map function

public static final Function<Tuple2<String, String>,Row> mappingFunc = (tuple) -> {
    return RowFactory.create(tuple._1(),tuple._2());
};

Now you can call this function to get JavaRDD<Row>

JavaRDD<Row> rowRDD = filesRDD.map(mappingFunc);

With Java 8 it is simply like

JavaRDD<Row> rowRDD = filesRDD.map(tuple -> RowFactory.create(tuple._1(),tuple._2()));

Another way to get Dataframe from JavaPairRDD is

DataFrame df = sqlContext.createDataset(JavaPairRDD.toRDD(filesRDD), Encoders.tuple(Encoders.STRING(),Encoders.STRING())).toDF();
0
VenkatN On

Below is one way you can achieve this.

    //Read whole files
    JavaPairRDD<String, String> pairRDD = sparkContext.wholeTextFiles(path);

    //create a structType for creating the dataframe later. You might want to
    //do this in a different way if your schema is big/complicated. For the sake of this
    //example I took a simple one.
    StructType structType = DataTypes
            .createStructType(
                    new StructField[]{
                            DataTypes.createStructField("id", DataTypes.StringType, true)
                            , DataTypes.createStructField("name", DataTypes.StringType, true)});


    //create an RDD<Row> from pairRDD
    JavaRDD<Row> rowJavaRDD = pairRDD.values().flatMap(new FlatMapFunction<String, Row>() {
        public Iterable<Row> call(String s) throws Exception {
            List<Row> rows = new ArrayList<Row>();
            for (String line : s.split("\n")) {
                String[] values = line.split(",");
                Row row = RowFactory.create(values[0], values[1]);
                rows.add(row);
            }
            return rows;
        }
    });


    //Create Dataframe.
    sqlContext.createDataFrame(rowJavaRDD, structType);

Sample Data I used
File1:

1, john  
2, steve

File2:

3, Mike  
4, Mary  

output from df.show():

+---+------+
| id|  name|
+---+------+
|  1|  john|
|  2| steve|
|  3|  Mike|
|  4|  Mary|
+---+------+