I have a apache-beam application that runs pipeline both locally with direct-runner and in google cloud using dataflow runner. It works locally but fails the google dataflow runner.
Here are the error traces:
(9938ce94c0752c7): java.lang.RuntimeException: com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo at com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:283) at com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:253) at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:55) at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:43) at com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78) at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:142) at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:271) at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899) at com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95) at com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:66) at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:360) at com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:271) ... 14 more
Caused by: java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:75) at com.google.cloud.dataflow.worker.UserParDoFnFactory$UserDoFnExtractor.getDoFnInfo(UserParDoFnFactory.java:64) at com.google.cloud.dataflow.worker.UserParDoFnFactory$1.call(UserParDoFnFactory.java:100) at com.google.cloud.dataflow.worker.UserParDoFnFactory$1.call(UserParDoFnFactory.java:97) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208) ... 20 more
Caused by: java.lang.ClassNotFoundException: Header_H at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:628) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1486) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1336) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:72) ... 28 more
It points to
"... unable to deserialize Serialized DoFnInfo"
and
"... java.lang.ClassNotFoundException: Header_H "
I suspect this has something to do with my use of bytebuddy code to create class Header_H
. I used bytebuddy to build a subclass based on some.class
in the existing source code and additional user input from a configuration file at runtime, i.e. Header_H
only becomes available at runtime.
my bytebuddy code are somewhat like this:
builder = new ByteBuddy().subclass(some.class).name("Header_H").modifiers(PUBLIC);
.defineField("serialVersionUID", long.class, STATIC, PRIVATE, FINAL).value(37L)
.implement(Serializable.class);
Class <?> clazz = builder.make().load(getClass().getClassLoader()).getLoaded();
And then clazz
(in this case Header_H
) will be passed to the pipeline in dataflow. When I checked the content of the jar file in the temporary google cloud stage location, I see some.class
but not Header_H.class
and that probably causes the error "ClassNotFoundException".
So if my reasoning is right, then how can I make Beam to place runtime created class in the jar file to be sent to dataflow runner, given that I have implement(Serializable.class)
in my class creation?
Byte Buddy can inject a class in a jar file via:
This will alter an existing jar file to include the dynamically generated class. This way, you can alter an existing jar that already is on the system class path.
This API will also allow you to create a new jar and you could use the
Instrumentation
API (via a Java agent) that allows you to append this class as a new jar file to the class path. To avoid attaching an agent, you can also try to use the byte-buddy-agent project for a dynamic attachment.This would work by:
If the dynamic attachment is not allowed on Google Cloud, you might be able to solve this by a regular attachment on the command line.