I'm running a Spark application in cluster mode via spark-submit, and I have a custom Kryo registrator. The JAR is built with sbt "+assembly". Occasionally, after letting the job run for a while, some of the executors will produce errors like the below during a treeAggregate operation. What's weird is that the error only happens occasionally, after many, many objects have already been successfully serialized by Kryo. Often the entire Spark job finishes without apparent issue. When this error does pop up, it's usually accompanied by the task seeming to hang, and I need to kill Spark manually.

17/08/29 22:39:04 ERROR TransportRequestHandler: Error opening block StreamChunkId{streamId=542074019336, chunkIndex=0} for request from /10.0.29.65:34332
org.apache.spark.SparkException: Failed to register classes with Kryo
    at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:139)
    at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:292)
    at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:277)
    at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:186)
    at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:169)
    at org.apache.spark.storage.BlockManager$$anonfun$dropFromMemory$3.apply(BlockManager.scala:1382)
    at org.apache.spark.storage.BlockManager$$anonfun$dropFromMemory$3.apply(BlockManager.scala:1377)
    at org.apache.spark.storage.DiskStore.put(DiskStore.scala:69)
    at org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:1377)
    at org.apache.spark.storage.memory.MemoryStore.org$apache$spark$storage$memory$MemoryStore$$dropBlock$1(MemoryStore.scala:524)
    at org.apache.spark.storage.memory.MemoryStore$$anonfun$evictBlocksToFreeSpace$2.apply(MemoryStore.scala:545)
    at org.apache.spark.storage.memory.MemoryStore$$anonfun$evictBlocksToFreeSpace$2.apply(MemoryStore.scala:539)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.storage.memory.MemoryStore.evictBlocksToFreeSpace(MemoryStore.scala:539)
    at org.apache.spark.memory.StorageMemoryPool.acquireMemory(StorageMemoryPool.scala:92)
    at org.apache.spark.memory.StorageMemoryPool.acquireMemory(StorageMemoryPool.scala:73)
    at org.apache.spark.memory.StaticMemoryManager.acquireStorageMemory(StaticMemoryManager.scala:72)
    at org.apache.spark.storage.memory.MemoryStore.putBytes(MemoryStore.scala:147)
    at org.apache.spark.storage.BlockManager.maybeCacheDiskBytesInMemory(BlockManager.scala:1143)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doGetLocalBytes(BlockManager.scala:594)
    at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:559)
    at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:559)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:559)
    at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:353)
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61)
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
    at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:89)
    at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:125)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
    at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.foo.bar.MyKryoRegistrator
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$5.apply(KryoSerializer.scala:134)
    at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$5.apply(KryoSerializer.scala:134)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:134)
    ... 60 more

My Spark session looks like this:

val spark = SparkSession.builder()
                .appName("FooBar")
                .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .config("spark.kryoserializer.buffer.max", "2047m")                                        
                .config("spark.kryo.registrator","com.foo.bar.MyKryoRegistrator")
                .config("spark.kryo.registrationRequired", "true")
                .config("spark.network.timeout", "3600s")
                .config("spark.driver.maxResultSize", "0")
                .config("spark.rdd.compress", "true")
                .config("spark.shuffle.spill", "true")
                .getOrCreate()

and I'm passing these configuration options to spark-submit:

--conf "spark.executor.heartbeatInterval=400s"
--conf "spark.speculation=true"
--conf "spark.speculation.multiplier=30"
--conf "spark.speculation.quantile=0.95"
--conf "spark.memory.useLegacyMode=true"
--conf "spark.shuffle.memoryFraction=0.8"
--conf "spark.storage.memoryFraction=0.2"
--driver-java-options "-XX:+UseG1GC"

This is what my registrator code looks like:

package com.foo.bar

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[Array[Array[Byte]]])
    kryo.register(classOf[Array[Double]])
    kryo.register(classOf[Array[Int]])
    kryo.register(classOf[Array[Object]])
    kryo.register(classOf[Array[scala.collection.immutable.Range.Inclusive]])
    kryo.register(classOf[scala.collection.mutable.WrappedArray.ofRef[_]])
    kryo.register(Class.forName("scala.reflect.ClassTag$$anon$1"))
    kryo.register(classOf[scala.util.matching.Regex])

    kryo.register(classOf[java.lang.Class[_]])
    kryo.register(classOf[java.util.Date])

    kryo.register(classOf[awscala.s3.Bucket])
    kryo.register(classOf[com.amazonaws.services.s3.model.Owner])

    kryo.register(Class.forName("org.apache.spark.broadcast.TorrentBroadcast"))

    kryo.register(Class.forName("org.apache.spark.ml.feature.Instance"))
    kryo.register(classOf[org.apache.spark.ml.linalg.DenseVector])
    kryo.register(classOf[org.apache.spark.ml.linalg.SparseVector])
    kryo.register(classOf[org.apache.spark.ml.linalg.Vector])
    kryo.register(classOf[org.apache.spark.mllib.stat.MultivariateOnlineSummarizer])
    kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericInternalRow])
    kryo.register(Class.forName("org.apache.spark.sql.execution.columnar.CachedBatch"))
    kryo.register(classOf[org.apache.spark.storage.BroadcastBlockId])
  }
}

I found some discussion of what sounds like a very similar issue (KryoSerializer cannot find my SparkKryoRegistrator), but there's unfortunately no answer.

Any idea what the issue might be? Is this a bug in Spark?

0

There are 0 answers