I ran into a little problem with my Spark Scala script. Basically I have raw data which I am doing aggregations on and after grouping and counting etc I want to save the output to a specific JSON format.
EDIT:
I tried to simplify the question and rewrote it:
When I select data from the source dataframe with an Array[org.apache.spark.sql.Column]
where the column names have aliases, then using column names (or indeed indices) as variables when trying to map the rows to a case class, then I get a "Task not serializable" exception.
var dm = sqlContext.createDataFrame(Seq((1,"James"),(2,"Anna"))).toDF("id", "name")
val cl = dm.columns
val cl2 = cl.map(name => col(name).as(name.capitalize))
val dm2 = dm.select(cl2:_*)
val n = "Name"
case class Result(Name:String)
val r = dm2.map(row => Result(row.getAs(n))).toDF
And the second part or the question, I actually need the final schema to be an array of these Result
class objects. I still haven't figured out, how to do this as well. The expected result should have a schema like that:
case class Test(var FilteredStatistics: Array[Result])
val t = Test(Array(Result("Anna"), Result("James")))
val t2 = sc.parallelize(Seq(t)).toDF
scala> t2.printSchema
root
|-- FilteredStatistics: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Name: string (nullable = true)
TL;DR:
How to map dataframe rows to a case class object when dataframe columns have aliases and variables are used for column names?
How to add these case class objects to an array?
Serialization Issue: the problem here is the
val n = "Name"
: it is used inside an anonymous function passed into an RDD transformation (dm2.map(...)
), which makes Spark close over that variable and the scope containing it, which also includescl2
which has the typeArray[Column]
, hence it isn't serializable.The solution is simple - either inline
n
(to getdm2.map(row => Result(row.getAs("Name")))
), or place it in a Serializable context (an object or a class that doesn't contain any non-serializable members).