I am a new programmer of scala(2.11) and spark (1.6.0) who is trying to convert an RDD to a DF without spark-csv package (for practicing but also because of some technical issues). After reading the starter guide of Spark and all the related posts of stackoverflow, I don't know how to make some methods (4) to work -only one works for me, and I don't know why-:
Every help with any of them will be amazing!
I have a simple table like in a txt file:
Jorgito 10 1 Soltero
Juanito 20 2 Casado
Jaimito 30 3 Divociado
I code a few preliminars:
var RDD_filas = RDD_datos.map(_.split("\t"))
var esquema = new StructType()
.add("Nombre", StringType)
.add("Edad", IntegerType)
.add("Hijos",IntegerType)
.add("EC",StringType)
import org.apache.spark.sql._
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.spark.sql.SQLContext
case class X(Nombre: String, Edad: Int, Hijos: Int, EC: String)
And, then, I apply all the methods I have seen which doesn't work:
var DF_datos = RDD_filas.map({case Array(s0, s1, s2, s3) => X(s0, s1.trim.toInt, s2.trim.toInt, s3)}).toDF("Nombre","Edad","Hijos","EC")
var DF_datos2 = RDD_filas.map(p => X(p(0), p(1).trim.toInt,p(2).trim.toInt,p(3))).toDF("Nombre","Edad","Hijos","EC")
var DF_datos3 = RDD_filas.map(Array(s0, s1, s2, s3) => Array(s0, s1.trim.toInt, s2.trim.toInt, s3)).toDF("Nombre","Edad","Hijos","EC")
var DF_datos4 = sqlContext.createDataFrame(RDD_filas,esquema)
The first three methods allows me to create the DF and to print their schemas, but they don't have the header (DF_datos.header() returns the first row) and I have an error if I try DF_datos.show() The strangest one (for me) is number 4, because it is supposed to be the most "canonical" way.
Only this worked for me:
var a = RDD_datos.map(_.split(" ")).take(3)
val rdd = sc.makeRDD(a)
val df = rdd.map {case Array(s0, s1, s2, s3) => X(s0, s1.toInt, s2.toInt, s3)}.toDF()
In order to use
sqlContext.createDataFrame
you will need to have anRDD[Row]
where the types of the entries in your rows correspond to the types in your schema. Thus you will need to convert some entries fromString
toInt
when appropriate.Here is an example:
However this is a lot of work so I would recommend that you either a) upgrade to Spark 2.0 and use the built in
DataFrameReader
csv
loader or b) look intospark-csv
. In both cases you would simply set the delimiter to\s
or\t
as needed.