Hi first time posting out of desperation ^^U, I'm trying to make this work. The idea is: from a Dataframe with one column representing a list of ids I want to return a new Dataframe with a new column representing a list of the measures inside this ids for past records. Im getting "task not serializable error" and I think it points to the SparkContext instance as seen in the log:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@42ebd0a3)
- field (class: Myclass$$anonfun$6, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class Myclass$$anonfun$6, <function1>)
I guess there is something inside the map function that can't be there, Since it is pointing to the SparkContext, I am now explicitly using SparkContext as parameter in both myMethod and myDaoMethod All my classes implement Serializable.
Any help welcomed. Thank you.
def myMethod(df: DataFrame, factory: myFactory, sc: SparkContext)
(implicit sqlContext: SQLContext) : DataFrame = {
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// function to return date n weeks ago
val getDateNWeeksAgo: (String, Int) => String = (date: String, n: Int) => LocalDate.parse(date, DateTimeFormatter.BASIC_ISO_DATE).minusWeeks(n).toString
val myNewDF= df.rdd.map(r=> {
val name = r.getAs[String]("name")
val ym: String = r.getAs[String]("ym")
val dd: String = r.getAs[String]("dd")
val ymd: String = r.getAs[String]("ymd")
val mag = r.getAs[String]("mag")
val listId = r.getAs[String]("list_id") // list --> String [1, 5, 24]
val listSplit = listId.substring(1, listId.length - 1).split(",") // Array[1, 5, 24]
val listValues = new util.ArrayList[String]() // List to store the
for (id <- 0 until listSplit.length) { // loop through the array of ids
var value = 0d
val meas1wAgo = findValueById(myDao.MyDaoMethod(name, getDateNWeeksAgo(ymd, 1), mag)(sqlContext, sc), listSplit(id))
/* more code regarding algorithm with more measures n weeks ago*/
value = meas1wAgo.toDouble
listValues.add(value.toString)
}
Row(name, ym, dd, mag, listId, listValues)
})
// Define the schema for the resulting DataFrame
val schema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("meas_ym", StringType, nullable = false),
StructField("meas_dd", StringType, nullable = false),
StructField("mag", StringType, nullable = false),
StructField("list_id", StringType, nullable = false),
StructField("listValues", DataTypes.createArrayType(DataTypes.StringType), nullable = false)
))
// Create a DataFrame from the RDD[Row] with the specified schema
val DFwithValues= sqlContext.createDataFrame(myNewDF, schema)
DFwithValues
}
MyDaoMethod is defined outside the greater method and correctly queries the DB and returns a Dataframe with measures of the desired date given a name, date and mag.
findValueById is defined outside and correctly returns a the measure in the form of String given a Dataframe and the id of the measure.
The stackTrace I'm getting is the following:
diagnostics: User class threw exception: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:415)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:405)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2353)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:393)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:392)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.map(RDD.scala:392)
at /* user comment: Map Line -> /*scala:307)
[...]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:675)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@42ebd0a3)
- field (class: MyClass$$anonfun$6, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class Myclass$$anonfun$6, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:412)
... 25 more
In addition to mazaneicha's point if you are using Dataset in findValueById use a join with your ID dataframe joining on the id and let Spark manage the join for you. i.e. open the ID dataframe once, then join on it, don't do it inside another object requiring SparkContext
You cannot use SparkContext's inside of spark operations, they only exist on the driver node.