Apache-beam Expansion service failed to build transform for Python ReadFromJdbc #apache_beam.io.jdbc

505 views Asked by At

I am trying to read data from TIBCO Data Virtualization (TDV) with apache_beam.io.jdbc module ReadFromJdbc https://beam.apache.org/releases/pydoc/2.43.0/_modules/apache_beam/io/jdbc.html#ReadFromJdbc

    JdbcTestRow = typing.NamedTuple("JdbcTestRow",[("column1",int)])
    coders.registry.register_coder(JdbcTestRow, coders.RowCoder)

    with beam.Pipeline(options=pipeline_options) as p:        
        raw_records = (
            p
            | "Initialise" >> beam.Create(["Initialize"])
            | 'Read Records from JDBC' >> ReadFromJdbc(
                    table_name='test_table'
                    ,jdbc_url=jdbc_conn_url
                    ,driver_class_name=driver_class_name
                    ,classpath=["/tmp/csjdbc.jar","/tmp/csjdbc8.jar"]
                    ,username=sql_username
                    ,password=sql_password
                    ,query=sql_query
                    ,fetch_size=1000
            )
        )

I have downloaded the expansion service jar beam-sdks-java-extensions-schemaio-expansion-service-2.43.0.jar and copied in to apache beam cache directory ~/.apache_beam/cache/jars

The expansion service is started but it failed with below exception.

INFO:apache_beam.utils.subprocess_server:Using cached job server jar from https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-extensions-schemaio-expansion-service/2.43.0/beam-sdks-java-extensions-schemaio-expansion-service-2.43.0.jar
INFO:root:Starting a JAR-based expansion service from JAR /root/.apache_beam/cache/jars/beam-sdks-java-extensions-schemaio-expansion-service-2.43.0.jar and with classpath: ['/tmp/csjdbc.jar', '/tmp/csjdbc8.jar']
INFO:apache_beam.utils.subprocess_server:Starting service with ['java' '-jar' '/root/.apache_beam/cache/jars/composite-jars/98d690ef1333b1772b8aed00367bf3f080801f78cf284a6dfcbafd11cfd5fac8.jar' '47111' '--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-extensions-schemaio-expansion-service-2.43.0.jar,/tmp/csjdbc.jar,/tmp/csjdbc8.jar']
INFO:apache_beam.utils.subprocess_server:Starting expansion service at localhost:47111
INFO:apache_beam.utils.subprocess_server:May 04, 2023 5:24:17 AM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
INFO:apache_beam.utils.subprocess_server:INFO: Registering external transforms: [beam:transform:org.apache.beam:schemaio_jdbc_read:v1, beam:transform:org.apache.beam:schemaio_jdbc_write:v1, beam:transform:org.apache.beam:schemaio_avro_read:v1, beam:transform:org.apache.beam:schemaio_avro_write:v1, beam:external:java:generate_sequence:v1]
INFO:apache_beam.utils.subprocess_server:       beam:transform:org.apache.beam:schemaio_jdbc_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@72057ecf
INFO:apache_beam.utils.subprocess_server:       beam:transform:org.apache.beam:schemaio_jdbc_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@1afd44cb
INFO:apache_beam.utils.subprocess_server:       beam:transform:org.apache.beam:schemaio_avro_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@6973b51b
INFO:apache_beam.utils.subprocess_server:       beam:transform:org.apache.beam:schemaio_avro_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@1ab3a8c8
INFO:apache_beam.utils.subprocess_server:       beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@43195e57
INFO:apache_beam.utils.subprocess_server:May 04, 2023 5:24:18 AM org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Read Records from JDBC' with URN 'beam:transform:org.apache.beam:schemaio_jdbc_read:v1'
INFO:apache_beam.utils.subprocess_server:May 04, 2023 5:24:18 AM org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader payloadToConfig
INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class 'org.apache.beam.sdk.extensions.schemaio.expansion.ExternalSchemaIOTransformRegistrar$Configuration' has no schema registered. Attempting to construct with setter approach.
Traceback (most recent call last):
  File "TestReadFromJDBC.py", line 505, in <module>
    run(
  File "TestReadFromJDBC.py", line 371, in run
    p
  File "/usr/local/lib/python3.8/site-packages/apache_beam/pvalue.py", line 137, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 652, in apply
    return self.apply(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 663, in apply
    return self.apply(transform, pvalueish)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 709, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 142, in apply
    return super().apply(transform, input, options)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 185, in apply
    return m(transform, input, options)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform
    return transform.expand(input)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/transforms/external.py", line 526, in expand
    raise RuntimeError(response.error)
RuntimeError: java.lang.RuntimeException: Failed to get dependencies of beam:transform:org.apache.beam:schemaio_jdbc_read:v1 from spec urn: "beam:transform:org.apache.beam:schemaio_jdbc_read:v1"
payload: "PAYLOAD_REPACED"

        at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1.getDependencies(ExpansionService.java:166)
        at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:512)
        at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:596)
        at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:220)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:354)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: java.lang.UnsupportedOperationException: Cannot define class using reflection: Unable to make protected java.lang.Package java.lang.ClassLoader.getPackage(java.lang.String) accessible: module java.base does not "opens java.lang" to unnamed module @37617b37
        at net.bytebuddy.dynamic.loading.ClassInjector$UsingReflection$Dispatcher$Initializable$Unavailable.defineClass(ClassInjector.java:472)
        at net.bytebuddy.dynamic.loading.ClassInjector$UsingReflection.injectRaw(ClassInjector.java:284)
        at net.bytebuddy.dynamic.loading.ClassInjector$AbstractBase.inject(ClassInjector.java:118)
        at net.bytebuddy.dynamic.loading.ClassLoadingStrategy$Default$InjectionDispatcher.load(ClassLoadingStrategy.java:241)
        at net.bytebuddy.dynamic.loading.ClassLoadingStrategy$Default.load(ClassLoadingStrategy.java:148)
        at net.bytebuddy.dynamic.TypeResolutionStrategy$Passive.initialize(TypeResolutionStrategy.java:101)
        at net.bytebuddy.dynamic.DynamicType$Default$Unloaded.load(DynamicType.java:6317)
        at org.apache.beam.sdk.coders.RowCoderGenerator.generate(RowCoderGenerator.java:172)
        at org.apache.beam.sdk.schemas.SchemaCoder.getDelegateCoder(SchemaCoder.java:117)
        at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:129)
        at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.decodeConfigObjectRow(ExpansionService.java:232)
        at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.payloadToConfigSetters(ExpansionService.java:295)
        at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.payloadToConfig(ExpansionService.java:261)
        at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1.getDependencies(ExpansionService.java:158)
        ... 11 more
0

There are 0 answers