How to serialize runtime created class in Apache Beam

1.8k views Asked by At

I have a apache-beam application that runs pipeline both locally with direct-runner and in google cloud using dataflow runner. It works locally but fails the google dataflow runner.

Here are the error traces:

(9938ce94c0752c7): java.lang.RuntimeException: com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo at com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:283) at com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:253) at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:55) at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:43) at com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78) at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:142) at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:271) at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899) at com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95) at com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:66) at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:360) at com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:271) ... 14 more
Caused by: java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:75) at com.google.cloud.dataflow.worker.UserParDoFnFactory$UserDoFnExtractor.getDoFnInfo(UserParDoFnFactory.java:64) at com.google.cloud.dataflow.worker.UserParDoFnFactory$1.call(UserParDoFnFactory.java:100) at com.google.cloud.dataflow.worker.UserParDoFnFactory$1.call(UserParDoFnFactory.java:97) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208) ... 20 more
Caused by: java.lang.ClassNotFoundException: Header_H at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:628) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1486) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1336) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:72) ... 28 more

It points to

"... unable to deserialize Serialized DoFnInfo"

and

"... java.lang.ClassNotFoundException: Header_H "

I suspect this has something to do with my use of bytebuddy code to create class Header_H. I used bytebuddy to build a subclass based on some.class in the existing source code and additional user input from a configuration file at runtime, i.e. Header_H only becomes available at runtime.

my bytebuddy code are somewhat like this:

builder = new ByteBuddy().subclass(some.class).name("Header_H").modifiers(PUBLIC);
       .defineField("serialVersionUID", long.class, STATIC, PRIVATE, FINAL).value(37L)
       .implement(Serializable.class);

Class <?> clazz = builder.make().load(getClass().getClassLoader()).getLoaded();

And then clazz (in this case Header_H) will be passed to the pipeline in dataflow. When I checked the content of the jar file in the temporary google cloud stage location, I see some.class but not Header_H.class and that probably causes the error "ClassNotFoundException".

So if my reasoning is right, then how can I make Beam to place runtime created class in the jar file to be sent to dataflow runner, given that I have implement(Serializable.class) in my class creation?

2

There are 2 answers

1
Rafael Winterhalter On BEST ANSWER

Byte Buddy can inject a class in a jar file via:

DynamicType.Unloaded<?> type = builder.make();
builder.inject(someJar);

This will alter an existing jar file to include the dynamically generated class. This way, you can alter an existing jar that already is on the system class path.

This API will also allow you to create a new jar and you could use the Instrumentation API (via a Java agent) that allows you to append this class as a new jar file to the class path. To avoid attaching an agent, you can also try to use the byte-buddy-agent project for a dynamic attachment.

This would work by:

File someFolder = ...
File jar = builder.saveIn(someFolder);
ByteBuddyAgent.install().appendToSystemClassLoaderSearch(new JarFile(jar));

If the dynamic attachment is not allowed on Google Cloud, you might be able to solve this by a regular attachment on the command line.

1
jkff On

The Dataflow runner does not control the contents of your JAR files - it only parses your program's classpath, reads the JARs from disk and copies them to your pipeline's staging directory on GCS. Right now Beam does not provide a way to ship classes that are not contained in the JARs on your classpath.

You will probably need to find a way to use only classes from those JARs in the specification of your pipeline, however you can of course still use ByteBuddy in your DoFn's or other code that runs on the workers locally. But note that anything that will get shipped between workers (e.g. contents of PCollection's) still must be serializable (serializable on one worker and deserializable on another) or have a Coder.

Alternatively, there might be a way to make ByteBuddy produce a JAR and dynamically add it to your program's classpath. That might work, but that's a ByteBuddy-specific question and I'm not familiar enough with ByteBuddy to tell how to do it.