I have been experimenting with Apache Spark trying to solve some queries like top-k, skyline etc.
I have made a wrapper which encloses SparkConf
and JavaSparkContext
named SparkContext
. This class also implements serializable but since SparkConf
and JavaSparkContext
are not serializable then the class isn't either.
I have a class solving the topK query named TopK
, the class implements serializable but the class also has a SparkContext
member variable which is not serializable (for the reason above). Therefore I am getting an exception whenever I try to execute a TopK
method from within a .reduce()
function in an RDD.
The solution I have found is to make SparkContext
transient.
My question is: Should I keep the SparkContext
variable as transient or am I doing a big mistake?
SparkContext
class:
import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
public class SparkContext implements Serializable {
private final SparkConf sparConf; // this is not serializable
private final JavaSparkContext sparkContext; // this is not either
protected SparkContext(String appName, String master) {
this.sparConf = new SparkConf();
this.sparConf.setAppName(appName);
this.sparConf.setMaster(master);
this.sparkContext = new JavaSparkContext(sparConf);
}
protected JavaRDD<String> textFile(String path) {
return sparkContext.textFile(path);
}
}
TopK
class:
public class TopK implements QueryCalculator, Serializable {
private final transient SparkContext sparkContext;
.
.
.
}
Example that throws Task not serializable
exception. getBiggestPointByXDimension
won't even get entered because in order for it to be executed in a reduce function the class enclosing it (TopK
) must be serializable.
private Point findMedianPoint(JavaRDD<Point> points) {
Point biggestPointByXDimension = points.reduce((a, b) -> getBiggestPointByXDimension(a, b));
.
.
.
}
private Point getBiggestPointByXDimension(Point first, Point second) {
return first.getX() > second.getX() ? first : second;
}
To your question: Should I keep the SparkContext variable as transient?
Yes. That's ok. It's only encapsulating the (Java)SparkContext and the context is not usable on the workers, so marking it
transient
just tells the Serializer not to serialize that field.You could also have your own
SparkContext
wrapper not serializable and mark it as transient - same effect as above. (BTW, Given that SparkContext is the Scala class name for the spark context, I'd chose another name to avoid confusion down the road.)One more thing: As you pointed out, the reason why Spark is trying to serialize the complete enclosing class, is because a method of the class is being used within a closure. Avoid that!. Use an anonymous class or a self-contained closure (which will translate into an anonymous class at the end).