I am trying to use Kryo Serializer in spark streaming. I read in Spark tuning docs that -

Finally, if you don’t register your custom classes, Kryo will still work, but it will have to store the full class name with each object, which is wasteful.

So i am trying to register all classes. My case classes are -

trait Message extends java.io.Serializable

object MutableTypes {
  type Childs = scala.collection.mutable.Map[Int, (Long, Boolean)]
  type Parents = scala.collection.mutable.Map[Int, Childs]
}

case class IncomingRecord(id_1: String, id_raw: String, parents_to_add: MutableTypes.Parents, parents_to_delete: MutableTypes.Parents) extends Message

And i am registering class like this -

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrationRequired","true")
sparkConf.registerKryoClasses(Array(classOf[Tuple2[Long,Boolean]],classOf[IncomingRecord]))

I got this exception:

com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: scala.Tuple2$mcJZ$sp Note: To register this class use: kryo.register(scala.Tuple2$mcJZ$sp.class); Serialization trace: parents_to_add (com.test.IncomingRecord) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194) at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:147) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:185) at org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

How can I register my class? How to solve this?

Update:

I know turning registeration false will remove exception but that will not add that much performance due to extra overhead. I want to know how can I register my class.

2

There are 2 answers

3
Yaron On

Finally, if you don’t register your custom classes, Kryo will still work, but it will have to store the full class name with each object, which is wasteful. This is true only when using default value for spark.kryo.registrationRequired" (which is false)

The following should solve the exception issue (or avoiding setting any value for this parameter and using the default value which is false)

.set("spark.kryo.registrationRequired","false")

More info can be found here: http://spark.apache.org/docs/latest/configuration.html

spark.kryo.registrationRequired false (default value) Whether to require registration with Kryo. If set to 'true', Kryo will throw an exception if an unregistered class is serialized. If set to false (the default), Kryo will write unregistered class names along with each object. Writing class names can cause significant performance overhead, so enabling this option can enforce strictly that a user has not omitted classes from registration.

Some points - how to register kryo serialization:

0
Russell Bie On

I provide a method in another stackoverflow answer to get all the class names which are required to be registered quickly.

see: https://stackoverflow.com/a/55644422/5981256