I ran into a little problem with my Spark Scala script. Basically I have raw data which I am doing aggregations on and after grouping and counting etc I want to save the output to a specific JSON format.
EDIT:
I tried to simplify the question and rewrote it:
When I select data from the source dataframe with an Array[org.apache.spark.sql.Column] where the column names have aliases, then using column names (or indeed indices) as variables when trying to map the rows to a case class, then I get a "Task not serializable" exception.
var dm = sqlContext.createDataFrame(Seq((1,"James"),(2,"Anna"))).toDF("id", "name")
val cl = dm.columns
val cl2 = cl.map(name => col(name).as(name.capitalize))
val dm2 = dm.select(cl2:_*)
val n = "Name"
case class Result(Name:String)
val r = dm2.map(row => Result(row.getAs(n))).toDF
And the second part or the question, I actually need the final schema to be an array of these Result class objects. I still haven't figured out, how to do this as well. The expected result should have a schema like that:
case class Test(var FilteredStatistics: Array[Result])
val t = Test(Array(Result("Anna"), Result("James")))
val t2 = sc.parallelize(Seq(t)).toDF
scala> t2.printSchema
root
|-- FilteredStatistics: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Name: string (nullable = true)
TL;DR:
How to map dataframe rows to a case class object when dataframe columns have aliases and variables are used for column names?
How to add these case class objects to an array?
Serialization Issue: the problem here is the
val n = "Name": it is used inside an anonymous function passed into an RDD transformation (dm2.map(...)), which makes Spark close over that variable and the scope containing it, which also includescl2which has the typeArray[Column], hence it isn't serializable.The solution is simple - either inline
n(to getdm2.map(row => Result(row.getAs("Name")))), or place it in a Serializable context (an object or a class that doesn't contain any non-serializable members).