Why Eventarc for inserJOb on bigquery generating 2 events for every insert?

547 views Asked by At

I am working on a use case,where I need to trigger DAG when a bigquery table is inserted with some records. I am using Eventarc , and listening for insertJob event provided by Eventarc for bigquery. It working almost fine, but I am getting 2 events whenever I insert the records. Event is also getting generated,when I query the table, and DAG is getting triggered 2 times.

This is my eventrc setting enter image description here

2

There are 2 answers

0
Paul Fentress On

I ran into the same exact issue and was stumped for a while.

I followed something similar to what Guillaume suggests; however, I could not find any info about dry runs in my logs. These are the steps I used to resolve the issue when using event arc to trigger a GCP cloud function. I am not sure if this same method would work for cloud run, and other possible event triggers.

  1. Print out the event payload within my cloud function and re trigger the event in bigquery (add a new row).
print("Full payload: {}".format(payload))
  1. Go to your GCP cloud function, and select "Logs" in the top bar. Here since your function is being triggered twice, you should find two logs that start with "Full payload: ..."

Image of Logs

  1. What I found when inspecting both payloads is that they had some minor differences. Bingo, now we how to filter them. For me, I found that they had different meta data. One was for a table data read, and one was for table data change. But I they both trigger google.cloud.bigquery.v2.JobService.InsertJob, and therefore are causing the duplicates, which but only one of them actual loads the data into bigquery, which explains why I got two triggers, but only inserted one row.

  2. Solution: Now that I new the difference between the duplicate calls, I added this inside my cloud function:


import functions_framework

@functions_framework.cloud_event
def your_cloud_function(cloudevent):
    
    # Extracting cloud event data. 
    payload = cloudevent.data.get("protoPayload", {})
    
    # Extracting meta data.
    metadata = payload.get('metadata', {})
    
    # If meta data found continue. 
    if metadata:
        
        # Checking meta data for table change action.
        table_data_change = metadata.get('tableDataChange')
        
        
        if table_data_change:

            # do your logic that was being duplicated before.
            print("HELLO WORLD.")

        else: 
            pass

0
guillaume blaquiere On

Your eventarc configuration works well. When you perform a manual query, on the UI, you have, at least 2 insertjob entries.

Let's have a deeper look: You have that first enter image description here

Then that enter image description here

Focus your attention on the latest lines. You could see a "dryrun" attribute.

Indeed, on the UI, you have a first dry run query performed to validate it and to get the bytebilled value (the volume of data processed by the query, displayed in the upper right corner).

Therefore 2 insert jobs: one with dry run, one without (the real query execution)


That being said, you have to check, in your Cloud Functions, if the dry run parameter is set or not in the event body.