Apache Beam Streaming Write/Read BigQuery

131 views Asked by At

I'm running a streaming pipeline where I'm trying to write in BigQuery and then reading from it. Is there a way to be sure that what I've just written is there, before reading? I'm using python:

write_result_bigquery = (
            read_pubsub
            | f'Deserialize {TABLE_NAME.capitalize()} JSON' >> beam.Map(json.loads)
            | 'Add Timestamp Field' >>  beam.ParDo(AddTimestamp())
            | 'Log writing' >> beam.ParDo(LogResult())
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                f'{PROJECT_ID}:{DATASET}.{TABLE_NAME}',
                schema = table_schema,
                custom_gcs_temp_location = f'gs://{GCS_BUCKET}/tmpWrite/tmp',
                write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
                create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                insert_retry_strategy = RetryStrategy.RETRY_ON_TRANSIENT_ERROR
                )
        )
_ = write_result_bigquery

#2. READ DATA > LAST PROCESSED TIMESTAMP
read_bigquery_timestamp  = (
            read_pubsub
            | 'Get data' >> beam.Map(read_timestamp_file, GCS_BUCKET, source, timestamp_key_cons_mobile)
            | 'Get timestamp' >> beam.Map(get_timestamp, key = timestamp_key_cons_mobile)
            | 'Create query input' >> beam.Map(generate_query_timestamp, query_ts, PROJECT_ID, DATASET, [TABLE_NAME], [id_col])
            | 'Read BigQuery by timestamp' >> beam.ParDo(ReadFromBigQueryRetryDoFn(), schema = schema_output)

For now I implemented a custom method that throws an exception when the query gives empty results, but it's not efficient and I'm getting other issues. It just forces Dataflow streaming to retry until he finds something.

any ideas?

1

There are 1 answers

2
Joe Moore On

For a streaming pipeline, the default API that Apache Beam uses in it's I/O built-in is the legacy streaming API which withholds access to newly written data for up to 2 minutes.

In order to be able to query data more immediately the new Storage Write API should be used instead as follows:

| "Write to BigQuery"
    >> beam.io.WriteToBigQuery(
        f"{PROJECT_ID}:{DATASET}.{TABLE_NAME}",
        schema=table_schema,
        custom_gcs_temp_location=f"gs://{GCS_BUCKET}/tmpWrite/tmp",
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR,
        method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API
    )

What the Storage Write API does is buffers the newly written rows and iteratively commits them in batches to the table. This allows you to near immediately access new data from the buffer. This is the recommended way of writing to BigQuery from DataFlow and should work for you.