Apache Beam Python SDK - Reading from Postgres using JDBC io

337 views Asked by At

I am looking for resources on how to read/write data from/to Postgres using Beam Python SDK.So far, I have learned that apache_beam.io.jdbc is our best bet (let me know if I there's a better alternative).

I tried using it, and it was able to handle primitive data types, like integer and strings. However, it could not handle LogicalTypes like "timestamp without time zone" type from Postgres.

Here's some details on my little experiment. Appreciate any help!

Python v3.11.4
apache-beam v2.51.0 (Python SDK)
postgres v11.5
DirectRunner

Here is the pipeline code:

with beam.Pipeline(options=None) as p:
    pipeline = (
        p
        | ReadFromJdbc(
            table_name="table_name",
            driver_class_name='org.postgresql.Driver',
            jdbc_url='jdbc:{}://{}:{}/{}'.format("postgresql", "127.0.0.1", "5432", "db_name"),
            username="postgres",
            password="redacted",
            query="SELECT * FROM table_name")
        | beam.Map(print)
    )

And it runs into this below error when trying to parse "timestamp without time zone" column. My understanding is the LogicalType MicrosInstant is not able to parse the timestamp. I can confirm the value of my timestamp field is not NULL.

File "apache_beam/coders/coder_impl.py", line 1890, in apache_beam.coders.coder_impl.LogicalTypeCoderImpl.decode_from_stream
  File "/Users/archit.shah/PycharmProjects/duplopy-pysql-beam/venv-3.11/lib/python3.11/site-packages/apache_beam/typehints/schemas.py", line 873, in to_language_type
    return Timestamp(seconds=int(value.seconds), micros=int(value.micros))
                             ^^^^^^^^^^^^^^^^^^
TypeError: int() argument must be a string, a bytes-like object or a real number, not 'NoneType'

I also see this below Java warning:

WARNING:root:severity: WARN
timestamp {
  seconds: 1697642143
  nanos: 162000000
}
message: "Hanged up for url: \"host.docker.internal:58970\"\n."
log_location: "org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer"
thread: "16"

May be I am missing something? I also tried registering a coder, but same result.

1

There are 1 answers

2
Yi Hu On BEST ANSWER

Add this two lines before pipeline creation should resolve the issue

from apache_beam.typehints.schemas import MillisInstant

LogicalType.register_logical_type(MillisInstant)

This was due to Java JdbcIO is using joda timestamp. Until https://github.com/apache/beam/issues/28359 has been resolved, this workaround is needed