Unable to return a default Coder for ParMultiDo(Anonymous).out1

43 views Asked by At

I am having some issues while getting my Dataflow template to work. While there are no compile errors, on running the template the following error is shown -

2024-02-21 21:57:43.398 IST
com.google.cloud.teleport.v2.common.UncaughtExceptionLogger - The template launch failed. java.lang.IllegalStateException: Unable to return a default Coder for ParMultiDo(Anonymous).out1 [PCollection@428160758]. Correct one of the following root causes:   No Coder has been manually specified;  you may do so using .setCoder().   Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for V.   Building a Coder using a registered CoderProvider failed.   See suppressed exceptions for detailed failures.   Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.     at org.apache.beam.sdk.util.Preconditions.checkStateNotNull(Preconditions.java:471)     at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:282)    at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:113)    at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifying(TransformHierarchy.java:226)     at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:212)    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:478)    at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:609)     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:324)  at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)  at com.google.cloud.teleport.v2.templates.DataStreamToSpannerBatch.run(DataStreamToSpannerBatch.java:596)   at com.google.cloud.teleport.v2.templates.DataStreamToSpannerBatch.main(DataStreamToSpannerBatch.java:470) 
2024-02-21 21:57:43.398 IST
java.lang.IllegalStateException: Unable to return a default Coder for ParMultiDo(Anonymous).out1 [PCollection@428160758]. Correct one of the following root causes:
2024-02-21 21:57:43.398 IST
  No Coder has been manually specified;  you may do so using .setCoder().
2024-02-21 21:57:43.398 IST
  Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for V.
2024-02-21 21:57:43.398 IST
  Building a Coder using a registered CoderProvider failed.
2024-02-21 21:57:43.398 IST
  See suppressed exceptions for detailed failures.
2024-02-21 21:57:43.398 IST
  Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
2024-02-21 21:57:43.399 IST
    at org.apache.beam.sdk.util.Preconditions.checkStateNotNull(Preconditions.java:471)
2024-02-21 21:57:43.399 IST
    at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:282)
2024-02-21 21:57:43.399 IST
    at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:113)
2024-02-21 21:57:43.399 IST
    at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifying(TransformHierarchy.java:226)
2024-02-21 21:57:43.399 IST
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:212)
2024-02-21 21:57:43.399 IST
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:478)

I looked at all the PTransforms and can't pinpoint any obvious issues. Can anyone please point me in the right direction?

The full code change I have made is available here (without using TupleTags) - https://github.com/GoogleCloudPlatform/DataflowTemplates/compare/main...manitgupta:DataflowTemplates:datastream-batch-template-v2?expand=1

The full code change I have made is available here (using TupleTags) - https://github.com/GoogleCloudPlatform/DataflowTemplates/compare/main...manitgupta:DataflowTemplates:datastream-batch-template?expand=1

This code is a fork of an already functioning template here - https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java

I have replaced a PTransform and DoFn in the above code - DataStreamRecordsToSpannerMutations and DataStreamRecordsToSpannerMutationsDoFn. The code changes I've made closely resemble the existing PTransforms used the original template linked above.

Will appreciate any leads on this, thanks!

I tried simplifying the above code a bit and change the code to do the transformation from datasreeamRecords to Mutation to -

PCollection<Mutation> spannerMutations = datastreamJsonRecords.apply("Convert Datastream record to Spanner mutation",
        ParDo.of(new DataStreamRecordsToSpannerMutationsDoFn(transformationContext, ddlView, schema, sourceType))
            .withSideInputs(ddlView)).setCoder(SerializableCoder.of(Mutation.class));

But I still get the same error.

0

There are 0 answers