Why is my Python BigQuery Dataflow sink not inserting records into the database?

1.6k views Asked by At

I'm using Python (2.7) and working within Google's DataFlow environment, needless to say, Google hasn't fully flushed everything out yet, and documentation is not quite sufficient just yet. However, the portion for writing from Dataflow to BigQuery is documented here BigQuery Sink.

According to the documentation, in order to specify the schema, you need to input a string:

schema = 'field_1:STRING, field_2:STRING, field_3:STRING, created_at:TIMESTAMP, updated_at:TIMESTAMP, field_4:STRING, field_5:STRING'

The table name, project ID and dataset ID are like this: 'example_project_id:example_dataset_id.example_table_name'

Now, all of that is working. See the code below, but from what I can see, it is successfully creating the table and the fields. Note: The project ID is set as a part of the arguments for the function.

bq_data | beam.io.Write(
    "Write to BQ", beam.io.BigQuerySink(
        'example_dataset_id.{}'.format(bq_table_name),
        schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
    )
)

Now, it looks like I can get things inserting by using this:

bq_data = pipeline | beam.Create(
    [{
        'field_1': 'ExampleIdentifier',
        'field_2': 'ExampleValue',
        'field_3': 'ExampleFieldValue',
        'created_at': '2016-12-26T05:50:39Z',
        'updated_at': '2016-12-26T05:50:39Z',
        'field_4': 'ExampleDataIdentifier',
        'field_5: 'ExampleData'
    }]
)

But for some reason, when packing values into a PCollection, it says that it inserts into BigQuery, but when I query the table, it shows nothing.

Why isn't it inserting? I don't see any errors, yet nothing is inserting to BigQuery.

This is what the data looks like that is contained in the PCollection, I have close to 1,100 rows to insert:

{'field_1': 'ExampleIdentifier', 'field_2': 'ExampleValue', 'field_3': 'ExampleFieldValue', 'created_at': '2016-12-29 12:10:32', 'updated_at': '2016-12-29 12:10:32', 'field_4': 'ExampleDataIdentifier', 'field_5': 'ExampleData'}

Note: I checked into the date formatting, and the date formatting above is allowed for BigQuery insertion.

2

There are 2 answers

2
chamikara On BEST ANSWER

I tried an example with your exact schema and input and it worked for me. I had to do following fixes.

(1) Seems like you are not specifying a project in your arguments. You might be specifying this within your pipeline definition since you are not seeing an error for this. (2) There is a typo in the code you mentioned above. 'field_5: 'ExampleData' should be 'field_5': 'ExampleData' But I'm assuming this is just a typo in this question not in your original pipeline since you are not getting an error for this.

Are you running the latest version of Dataflow ? You can try creating a new virtual environment and run 'pip install google-cloud-dataflow' to install the latest version.

Is it possible to share your full pipleine for me to try out ?

It's hard to debug this remotely since you are using 'DirectPipelineRunner'. Is it possible to try running the same pipeline using 'DataflowPipelineRunner' (note that you'll need a GCP project with billing enabled for this) ? I will be able to view logs if you can run this using 'DataflowPipelineRunner' and provide a job id.

1
TalkDataToMe On

This answer is pretty late, but maybe it'll help someone else. Your write statement in the pipeline is written incorrectly.

bq_data | 'Write to BigQuery' >> 
    beam.io.Write(beam.io.BigQuerySink(known_args.output_table, 
    schema=schema, 
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) # This is overwrite whatever you have in your table