Dataflow JTDS SL Server Validation Query

171 views Asked by At

I'm currently trying to connect to a SQL Server from a Dataflow job using the JDBCIO step of a pipeline. I am getting the following error:

2022-12-21T15:06:16.7965222Z SEVERE: 2022-12-21T15:04:31.071Z: java.lang.AbstractMethodError
    2022-12-21T15:06:16.7966124Z    at net.sourceforge.jtds.jdbc.JtdsConnection.isValid(JtdsConnection.java:2833)
    2022-12-21T15:06:16.7967191Z    at org.apache.commons.dbcp2.DelegatingConnection.isValid(DelegatingConnection.java:895)
    2022-12-21T15:06:16.7968254Z    at org.apache.commons.dbcp2.PoolableConnection.validate(PoolableConnection.java:273)
    2022-12-21T15:06:16.7969451Z    at org.apache.commons.dbcp2.PoolableConnectionFactory.validateConnection(PoolableConnectionFactory.java:644)
    2022-12-21T15:06:16.7970625Z    at org.apache.commons.dbcp2.BasicDataSource.validateConnectionFactory(BasicDataSource.java:106)
    2022-12-21T15:06:16.7971833Z    at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:652)
    2022-12-21T15:06:16.7973522Z    at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:534)
    2022-12-21T15:06:16.7974830Z    at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:734)
    2022-12-21T15:06:16.7976091Z    at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:1354)

I have found online that I need to be able to set a validation query on the data source, but I don't know how to do that from within the dataflow job. Below is the code I'm using to set up the connection - can I add a validation query? Is there another workaround?

PCollection<Row> Coll = pipeline
            .apply("Connect", JdbcIO.<TableRow>read()
                    .withDataSourceConfiguration(
                            buildDataSourceConfig(options, URL))
                    .withQuery(query)
                    .withRowMapper(new JdbcIO.RowMapper<TableRow>() {
                      // Convert ResultSet to PCollection
                      public TableRow mapRow(ResultSet rs) throws Exception {
                        String ipAddress = rs.getString("IP");
                        return trOf(ipAddress);
                      }
                    }))

private static DataSourceConfiguration buildDataSourceConfig(Options options, String url)
          throws Exception {
    
    return DataSourceConfiguration
            .create("net.sourceforge.jtds.jdbc.Driver", url)
            .withUsername(user)
            .withPassword(pass);
  }
1

There are 1 answers

7
siggemannen On

You could create DataSource separately and pass it to DataSourceConfiguration constructor (https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/io/jdbc/JdbcIO.DataSourceConfiguration.html)

When creating DataSource, you should be able to set validation query:

https://commons.apache.org/proper/commons-dbcp/apidocs/org/apache/commons/dbcp2/BasicDataSource.html#setValidationQuery-java.lang.String-