Spark Dataframe schema definition using reflection with case classes and column name aliases

2k views Asked by At

I ran into a little problem with my Spark Scala script. Basically I have raw data which I am doing aggregations on and after grouping and counting etc I want to save the output to a specific JSON format.

EDIT:

I tried to simplify the question and rewrote it:

When I select data from the source dataframe with an Array[org.apache.spark.sql.Column] where the column names have aliases, then using column names (or indeed indices) as variables when trying to map the rows to a case class, then I get a "Task not serializable" exception.

var dm = sqlContext.createDataFrame(Seq((1,"James"),(2,"Anna"))).toDF("id", "name")

val cl = dm.columns
val cl2 = cl.map(name => col(name).as(name.capitalize))
val dm2 = dm.select(cl2:_*)
val n = "Name"
case class Result(Name:String)
val r = dm2.map(row => Result(row.getAs(n))).toDF

And the second part or the question, I actually need the final schema to be an array of these Result class objects. I still haven't figured out, how to do this as well. The expected result should have a schema like that:

    case class Test(var FilteredStatistics: Array[Result])
    val t = Test(Array(Result("Anna"), Result("James")))

    val t2 = sc.parallelize(Seq(t)).toDF

    scala> t2.printSchema
    root
     |-- FilteredStatistics: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- Name: string (nullable = true)

TL;DR:

  1. How to map dataframe rows to a case class object when dataframe columns have aliases and variables are used for column names?

  2. How to add these case class objects to an array?

1

There are 1 answers

2
Tzach Zohar On

Serialization Issue: the problem here is the val n = "Name": it is used inside an anonymous function passed into an RDD transformation (dm2.map(...)), which makes Spark close over that variable and the scope containing it, which also includes cl2 which has the type Array[Column], hence it isn't serializable.

The solution is simple - either inline n (to get dm2.map(row => Result(row.getAs("Name")))), or place it in a Serializable context (an object or a class that doesn't contain any non-serializable members).