I am having some issues while getting my Dataflow template to work. While there are no compile errors, on running the template the following error is shown -
2024-02-21 21:57:43.398 IST
com.google.cloud.teleport.v2.common.UncaughtExceptionLogger - The template launch failed. java.lang.IllegalStateException: Unable to return a default Coder for ParMultiDo(Anonymous).out1 [PCollection@428160758]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for V. Building a Coder using a registered CoderProvider failed. See suppressed exceptions for detailed failures. Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called. at org.apache.beam.sdk.util.Preconditions.checkStateNotNull(Preconditions.java:471) at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:282) at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:113) at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifying(TransformHierarchy.java:226) at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:212) at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:478) at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:609) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:324) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311) at com.google.cloud.teleport.v2.templates.DataStreamToSpannerBatch.run(DataStreamToSpannerBatch.java:596) at com.google.cloud.teleport.v2.templates.DataStreamToSpannerBatch.main(DataStreamToSpannerBatch.java:470)
2024-02-21 21:57:43.398 IST
java.lang.IllegalStateException: Unable to return a default Coder for ParMultiDo(Anonymous).out1 [PCollection@428160758]. Correct one of the following root causes:
2024-02-21 21:57:43.398 IST
No Coder has been manually specified; you may do so using .setCoder().
2024-02-21 21:57:43.398 IST
Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for V.
2024-02-21 21:57:43.398 IST
Building a Coder using a registered CoderProvider failed.
2024-02-21 21:57:43.398 IST
See suppressed exceptions for detailed failures.
2024-02-21 21:57:43.398 IST
Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
2024-02-21 21:57:43.399 IST
at org.apache.beam.sdk.util.Preconditions.checkStateNotNull(Preconditions.java:471)
2024-02-21 21:57:43.399 IST
at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:282)
2024-02-21 21:57:43.399 IST
at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:113)
2024-02-21 21:57:43.399 IST
at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifying(TransformHierarchy.java:226)
2024-02-21 21:57:43.399 IST
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:212)
2024-02-21 21:57:43.399 IST
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:478)
I looked at all the PTransforms
and can't pinpoint any obvious issues. Can anyone please point me in the right direction?
The full code change I have made is available here (without using TupleTags) - https://github.com/GoogleCloudPlatform/DataflowTemplates/compare/main...manitgupta:DataflowTemplates:datastream-batch-template-v2?expand=1
The full code change I have made is available here (using TupleTags) - https://github.com/GoogleCloudPlatform/DataflowTemplates/compare/main...manitgupta:DataflowTemplates:datastream-batch-template?expand=1
This code is a fork of an already functioning template here - https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java
I have replaced a PTransform
and DoFn
in the above code - DataStreamRecordsToSpannerMutations
and DataStreamRecordsToSpannerMutationsDoFn
. The code changes I've made closely resemble the existing PTransforms used the original template linked above.
Will appreciate any leads on this, thanks!
I tried simplifying the above code a bit and change the code to do the transformation from datasreeamRecords
to Mutation
to -
PCollection<Mutation> spannerMutations = datastreamJsonRecords.apply("Convert Datastream record to Spanner mutation",
ParDo.of(new DataStreamRecordsToSpannerMutationsDoFn(transformationContext, ddlView, schema, sourceType))
.withSideInputs(ddlView)).setCoder(SerializableCoder.of(Mutation.class));
But I still get the same error.