Spark/Scala: Impossible to make an RDD to DF conversion

2.4k views Asked by At

I am a new programmer of scala(2.11) and spark (1.6.0) who is trying to convert an RDD to a DF without spark-csv package (for practicing but also because of some technical issues). After reading the starter guide of Spark and all the related posts of stackoverflow, I don't know how to make some methods (4) to work -only one works for me, and I don't know why-:

Every help with any of them will be amazing!

I have a simple table like in a txt file:

Jorgito 10 1 Soltero
Juanito 20 2 Casado
Jaimito 30 3 Divociado

I code a few preliminars:

var RDD_filas = RDD_datos.map(_.split("\t"))
var esquema = new StructType()
.add("Nombre", StringType)
.add("Edad", IntegerType)
.add("Hijos",IntegerType)
.add("EC",StringType)

import org.apache.spark.sql._
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.spark.sql.SQLContext

case class X(Nombre: String, Edad: Int, Hijos: Int, EC: String)

And, then, I apply all the methods I have seen which doesn't work:

var DF_datos = RDD_filas.map({case Array(s0, s1, s2, s3) => X(s0, s1.trim.toInt, s2.trim.toInt, s3)}).toDF("Nombre","Edad","Hijos","EC")
var DF_datos2 = RDD_filas.map(p => X(p(0), p(1).trim.toInt,p(2).trim.toInt,p(3))).toDF("Nombre","Edad","Hijos","EC")
var DF_datos3 = RDD_filas.map(Array(s0, s1, s2, s3) => Array(s0, s1.trim.toInt, s2.trim.toInt, s3)).toDF("Nombre","Edad","Hijos","EC")
var DF_datos4 = sqlContext.createDataFrame(RDD_filas,esquema)

The first three methods allows me to create the DF and to print their schemas, but they don't have the header (DF_datos.header() returns the first row) and I have an error if I try DF_datos.show() The strangest one (for me) is number 4, because it is supposed to be the most "canonical" way.

Only this worked for me:

var a = RDD_datos.map(_.split(" ")).take(3)
val rdd = sc.makeRDD(a)
val df = rdd.map {case Array(s0, s1, s2, s3) => X(s0, s1.toInt, s2.toInt, s3)}.toDF()
2

There are 2 answers

3
evan.oman On

In order to use sqlContext.createDataFrame you will need to have an RDD[Row] where the types of the entries in your rows correspond to the types in your schema. Thus you will need to convert some entries from String to Int when appropriate.

Here is an example:

scala> val data = sc.textFile("./junk/dat.txt")
data: org.apache.spark.rdd.RDD[String] = ./junk/dat.txt MapPartitionsRDD[20] at textFile at <console>:28

scala> data.foreach{println}
Jorgito 10 1 Soltero
Juanito 20 2 Casado
Jaimito 30 3 Divociado

scala> :pa
// Entering paste mode (ctrl-D to finish)

var esquema = new StructType()
.add("Nombre", StringType)
.add("Edad", IntegerType)
.add("Hijos",IntegerType)
.add("EC",StringType)

// Exiting paste mode, now interpreting.

esquema: org.apache.spark.sql.types.StructType = StructType(StructField(Nombre,StringType,true), StructField(Edad,IntegerType,true), StructField(Hijos,IntegerType,true), StructField(EC,StringType,true))

scala> val rowRDD = data.map(l => l.split(" ")).map{case Array(a,b,c,d) => Row(a, b.toInt, c.toInt, d)}
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[22] at map at <console>:30

scala> val df = sqlContext.createDataFrame(rowRDD, esquema)
df: org.apache.spark.sql.DataFrame = [Nombre: string, Edad: int ... 2 more fields]

scala> df.show
+-------+----+-----+---------+
| Nombre|Edad|Hijos|       EC|
+-------+----+-----+---------+
|Jorgito|  10|    1|  Soltero|
|Juanito|  20|    2|   Casado|
|Jaimito|  30|    3|Divociado|
+-------+----+-----+---------+

However this is a lot of work so I would recommend that you either a) upgrade to Spark 2.0 and use the built in DataFrameReader csv loader or b) look into spark-csv. In both cases you would simply set the delimiter to \s or \t as needed.

0
Sorter On

One approach is to add column headers to the csv using Struct Type

val df = spark.createDataFrame(rdd, structType)  

If the csv already contains column headers, you can directly convert it into Data frame.

val df = spark.read.option("headers",true).csv("/path/to/csv")

Convert to DataFrame from source file

Now as you have used a custom delimiter.

val df = spark.read.option("delimiter", " ").csv("/path/to/csv")