How to specify bigquery schema in apache beam bigqueryIO (typescript sdk)

88 views Asked by At

While creating an Apache beam pipeline to readFromPubsub and writeToBigquery using TypeScript SDK, I'm getting an error that says;

Error: java.lang.IllegalArgumentException: The input doesn't has a schema

I can't find a way to specify input schema with TypScript SDK. The documentation also doesn't provide any info.

This is how we can specify schema with Python and Java SDK.

Sample code that gives error:

import * as beam from "apache-beam";
import * as bigqueryio from "apache-beam/io/bigqueryio";

export function createPipeline() {
  // A pipeline is simply a callable that takes a root object.
  return async (root: beam.Root) => {
    const pcollection = await root.applyAsync(
      beam.readFromPubSub({
        topic: "projects/PROJECT_NAME/topics/TOPIC_NAME",
      })
    );

    const resultColl = pcollection.applyAsync(
      bigqueryio.writeToBigQuery("PROJECT_NAME:DATASET.TABLE_NAME", {
        createDisposition: "Never",
      })
    );
  };
}

bigqueryio.writeToBigQuery(...) doesn't have a schema input parameter. Also, according to docs, the schema is not mandatory when putting createDisposition: "Never".

Is there any other way to specify the schema with typescript or how to fix this issue?

Here is complete error log:

...
Tearing down sdks:java:io:google-cloud-platform:expansion-service:shadowJar.
/home/mitanshu/Desktop/playground/beam-starter-typescript/node_modules/apache-beam/dist/src/apache_beam/transforms/external.js:173
                throw new Error(response.error);
                      ^

Error: java.lang.IllegalArgumentException: The input doesn't has a schema
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:3241)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:3223)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:2174)
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:479)
        at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:352)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaIOProvider$BigQuerySchemaIO$2.expand(BigQuerySchemaIOProvider.java:231)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaIOProvider$BigQuerySchemaIO$2.expand(BigQuerySchemaIOProvider.java:193)
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:496)
        at org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:402)
        at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:555)
        at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:639)
        at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:355)
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:867)
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)

    at RawExternalTransform.expandInternalAsync (/home/mitanshu/Desktop/playground/beam-starter-typescript/node_modules/apache-beam/dist/src/apache_beam/transforms/external.js:173:23)
    at runMicrotasks (<anonymous>)
    at processTicksAndRejections (node:internal/process/task_queues:96:5)
    at async Pipeline.applyAsyncTransform (/home/mitanshu/Desktop/playground/beam-starter-typescript/node_modules/apache-beam/dist/src/apache_beam/internal/pipeline.js:202:22)

Thanks!

0

There are 0 answers