Why is Spark performing worse when using Kryo serialization?

3.4k views Asked by At

I enabled Kryo serialization for my Spark job, enabled the setting to require registration, and ensured all my types were registered.

val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(classes)
conf.registerAvroSchemas(avroSchemas: _*)

Wallclock-time performance of the job worsened by about 20% and the number of bytes shuffled increased by almost 400%.

This seems really surprising to me, given the Spark documentation's suggestion that Kryo should be better.

Kryo is significantly faster and more compact than Java serialization (often as much as 10x)

I manually invoked the serialize method on instances of Spark's org.apache.spark.serializer.KryoSerializer and org.apache.spark.serializer.JavaSerializer with an example of my data. The results were consistent with the suggestions in the Spark documentation: Kryo produced 98 bytes; Java produced 993 bytes. That really is a 10x improvement.

A possibly confounding factor is that the objects which are being serialized and shuffled implement the Avro GenericRecord interface. I tried registering the Avro schemas in the SparkConf, but that showed no improvement.

I tried making new classes to shuffle the data which were simple Scala case classes, not including any of the Avro machinery. It didn't improve the shuffle performance or number of bytes exchanged.

The Spark code ends up boiling down to following:

case class A(
    f1: Long,
    f2: Option[Long],
    f3: Int,
    f4: Int,
    f5: Option[String],
    f6: Option[Int],
    f7: Option[String],
    f8: Option[Int],
    f9: Option[Int],
    f10: Option[Int],
    f11: Option[Int],
    f12: String,
    f13: Option[Double],
    f14: Option[Int],
    f15: Option[Double],
    f16: Option[Double],
    f17: List[String],
    f18: String) extends org.apache.avro.specific.SpecificRecordBase {
  def get(f: Int) : AnyRef = ???
  def put(f: Int, value: Any) : Unit = ???
  def getSchema(): org.apache.avro.Schema = A.SCHEMA$
}
object A extends AnyRef with Serializable {
  val SCHEMA$: org.apache.avro.Schema = ???
}

case class B(
    f1: Long
    f2: Long
    f3: String
    f4: String) extends org.apache.avro.specific.SpecificRecordBase {
  def get(field$ : Int) : AnyRef = ???
  def getSchema() : org.apache.avro.Schema = B.SCHEMA$
  def put(field$ : Int, value : Any) : Unit = ???
}
object B extends AnyRef with Serializable {
  val SCHEMA$ : org.apache.avro.Schema = ???
}

def join(as: RDD[A], bs: RDD[B]): (Iterable[A], Iterable[B]) = {
  val joined = as.map(a => a.f1 -> a) cogroup bs.map(b => b.f1 -> b)
  joined.map { case (_, asAndBs) => asAndBs }
}

Do you have any idea what might be going on or how I could get the better performance that should be available from Kryo?

2

There are 2 answers

1
Biju CD On

If your single record size is too small and having huge number of records might make your job slow.Try to increase the buffer size and see whether it makes any improvement.

Try the below one if not done already..

val conf = new SparkConf()
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  // Now it's 24 Mb of buffer by default instead of 0.064 Mb
  .set("spark.kryoserializer.buffer.mb","24") 

Ref:https://ogirardot.wordpress.com/2015/01/09/changing-sparks-default-java-serialization-to-kryo/

0
Paul Back On

Since you have high cardinality RDDs, broadcasting/broadcast hash joining would seem to be off limits unfortunately.

Your best best is to coalesce() your RDDs prior to joining. Are you seeing high skew in your shuffle times? If so, you may want to coalesce with shuffle=true.

Lastly, if you have RDDs of nested structures (e.g. JSON), that will sometimes allow you to bypass shuffles. Check out the slides and/or video here for a more detailed explanation.