I am trying to read data from TIBCO Data Virtualization (TDV) with apache_beam.io.jdbc module ReadFromJdbc https://beam.apache.org/releases/pydoc/2.43.0/_modules/apache_beam/io/jdbc.html#ReadFromJdbc
JdbcTestRow = typing.NamedTuple("JdbcTestRow",[("column1",int)])
coders.registry.register_coder(JdbcTestRow, coders.RowCoder)
with beam.Pipeline(options=pipeline_options) as p:
raw_records = (
p
| "Initialise" >> beam.Create(["Initialize"])
| 'Read Records from JDBC' >> ReadFromJdbc(
table_name='test_table'
,jdbc_url=jdbc_conn_url
,driver_class_name=driver_class_name
,classpath=["/tmp/csjdbc.jar","/tmp/csjdbc8.jar"]
,username=sql_username
,password=sql_password
,query=sql_query
,fetch_size=1000
)
)
I have downloaded the expansion service jar beam-sdks-java-extensions-schemaio-expansion-service-2.43.0.jar and copied in to apache beam cache directory ~/.apache_beam/cache/jars
The expansion service is started but it failed with below exception.
INFO:apache_beam.utils.subprocess_server:Using cached job server jar from https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-extensions-schemaio-expansion-service/2.43.0/beam-sdks-java-extensions-schemaio-expansion-service-2.43.0.jar
INFO:root:Starting a JAR-based expansion service from JAR /root/.apache_beam/cache/jars/beam-sdks-java-extensions-schemaio-expansion-service-2.43.0.jar and with classpath: ['/tmp/csjdbc.jar', '/tmp/csjdbc8.jar']
INFO:apache_beam.utils.subprocess_server:Starting service with ['java' '-jar' '/root/.apache_beam/cache/jars/composite-jars/98d690ef1333b1772b8aed00367bf3f080801f78cf284a6dfcbafd11cfd5fac8.jar' '47111' '--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-extensions-schemaio-expansion-service-2.43.0.jar,/tmp/csjdbc.jar,/tmp/csjdbc8.jar']
INFO:apache_beam.utils.subprocess_server:Starting expansion service at localhost:47111
INFO:apache_beam.utils.subprocess_server:May 04, 2023 5:24:17 AM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
INFO:apache_beam.utils.subprocess_server:INFO: Registering external transforms: [beam:transform:org.apache.beam:schemaio_jdbc_read:v1, beam:transform:org.apache.beam:schemaio_jdbc_write:v1, beam:transform:org.apache.beam:schemaio_avro_read:v1, beam:transform:org.apache.beam:schemaio_avro_write:v1, beam:external:java:generate_sequence:v1]
INFO:apache_beam.utils.subprocess_server: beam:transform:org.apache.beam:schemaio_jdbc_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@72057ecf
INFO:apache_beam.utils.subprocess_server: beam:transform:org.apache.beam:schemaio_jdbc_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@1afd44cb
INFO:apache_beam.utils.subprocess_server: beam:transform:org.apache.beam:schemaio_avro_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@6973b51b
INFO:apache_beam.utils.subprocess_server: beam:transform:org.apache.beam:schemaio_avro_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@1ab3a8c8
INFO:apache_beam.utils.subprocess_server: beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@43195e57
INFO:apache_beam.utils.subprocess_server:May 04, 2023 5:24:18 AM org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Read Records from JDBC' with URN 'beam:transform:org.apache.beam:schemaio_jdbc_read:v1'
INFO:apache_beam.utils.subprocess_server:May 04, 2023 5:24:18 AM org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader payloadToConfig
INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class 'org.apache.beam.sdk.extensions.schemaio.expansion.ExternalSchemaIOTransformRegistrar$Configuration' has no schema registered. Attempting to construct with setter approach.
Traceback (most recent call last):
File "TestReadFromJDBC.py", line 505, in <module>
run(
File "TestReadFromJDBC.py", line 371, in run
p
File "/usr/local/lib/python3.8/site-packages/apache_beam/pvalue.py", line 137, in __or__
return self.pipeline.apply(ptransform, self)
File "/usr/local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 652, in apply
return self.apply(
File "/usr/local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 663, in apply
return self.apply(transform, pvalueish)
File "/usr/local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 709, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 142, in apply
return super().apply(transform, input, options)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 185, in apply
return m(transform, input, options)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform
return transform.expand(input)
File "/usr/local/lib/python3.8/site-packages/apache_beam/transforms/external.py", line 526, in expand
raise RuntimeError(response.error)
RuntimeError: java.lang.RuntimeException: Failed to get dependencies of beam:transform:org.apache.beam:schemaio_jdbc_read:v1 from spec urn: "beam:transform:org.apache.beam:schemaio_jdbc_read:v1"
payload: "PAYLOAD_REPACED"
at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1.getDependencies(ExpansionService.java:166)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:512)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:596)
at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:220)
at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:354)
at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: java.lang.UnsupportedOperationException: Cannot define class using reflection: Unable to make protected java.lang.Package java.lang.ClassLoader.getPackage(java.lang.String) accessible: module java.base does not "opens java.lang" to unnamed module @37617b37
at net.bytebuddy.dynamic.loading.ClassInjector$UsingReflection$Dispatcher$Initializable$Unavailable.defineClass(ClassInjector.java:472)
at net.bytebuddy.dynamic.loading.ClassInjector$UsingReflection.injectRaw(ClassInjector.java:284)
at net.bytebuddy.dynamic.loading.ClassInjector$AbstractBase.inject(ClassInjector.java:118)
at net.bytebuddy.dynamic.loading.ClassLoadingStrategy$Default$InjectionDispatcher.load(ClassLoadingStrategy.java:241)
at net.bytebuddy.dynamic.loading.ClassLoadingStrategy$Default.load(ClassLoadingStrategy.java:148)
at net.bytebuddy.dynamic.TypeResolutionStrategy$Passive.initialize(TypeResolutionStrategy.java:101)
at net.bytebuddy.dynamic.DynamicType$Default$Unloaded.load(DynamicType.java:6317)
at org.apache.beam.sdk.coders.RowCoderGenerator.generate(RowCoderGenerator.java:172)
at org.apache.beam.sdk.schemas.SchemaCoder.getDelegateCoder(SchemaCoder.java:117)
at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:129)
at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.decodeConfigObjectRow(ExpansionService.java:232)
at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.payloadToConfigSetters(ExpansionService.java:295)
at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.payloadToConfig(ExpansionService.java:261)
at org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1.getDependencies(ExpansionService.java:158)
... 11 more