Handling Runtime Parameters as strings - Google Cloud DataFlow - Create a classic template Python SDK

826 views Asked by At

I am able to run a custom flow whenever I call the .py file, and using arguments provided by the argparse library. However, when I try to transform my arguments to runtime arguments, it doesn't work. Here is a sample of the code as a standalone pipe:

import argparse
import logging
import datetime,os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import os



def get_data(dataset,dateBegin,dateEnd):
    """
    given 'DATASET','2020-10-01','2020-10-31'
    returns query for getting data

    """
    query= '''

    SELECT
    DISTINCT
    IFNULL(a,
        b) AS c FROM
    `myproject.'''+dataset+'''.commonTable_'''+dataset+'''`
    WHERE
    date BETWEEN "'''+dateBegin+'''" and "'''+dateEnd+'''"

    '''

    return query


def replacing(item,anondict=[]):
    return re.sub("(?i)"+"|".join(["("+anon+")" for anon in anondict]),"[REDACT]",item)


# Define pipeline runner
def run():

    # Command line arguments
    parser = argparse.ArgumentParser(description='Run the flow')
    parser.add_argument('--project', required=True, default='myproject')
    parser.add_argument('--bucket', required=True, default='abucket')
    parser.add_argument('--dataset', required=True)
    parser.add_argument('--dateBegin', required=True)
    parser.add_argument('--dateEnd', required=True)
    parser.add_argument('--anondict')

    opts = parser.parse_args()


    if opts.anondict==None:
        anondict=[]
    else:
        anondict= opts.anondict.split(',')

    project=opts.project
    bucket=opts.bucket
    dataset=opts.dataset
    dateBegin=opts.dateBegin
    dateEnd=opts.dateEnd

    
    query=get_data(dataset,dateBegin,dateEnd)

    argv = [
        '--project={0}'.format(project),
        '--job_name=flow',
        '--save_main_session',
        '--staging_location=gs://{0}/staging/'.format(bucket),
        '--temp_location=gs://{0}/staging/'.format(bucket),
        '--runner=DataFlowRunner',
        '--requirements_file=./requirements.txt',
        '--region=us-central1',
        '--max_num_workers=10'
    ]

    p = beam.Pipeline(argv=argv)

    # Read the table rows into a PCollection (a Python Dictionary)
    
    bq = p | 'GetData' >> beam.io.Read(beam.io.ReadFromBigQuery(project=project,query=query,use_standard_sql=True))

 
    anon = bq | 'Anonymize' >> beam.Map(lambda row: {
        'c':row['c'], 
        'd':re.sub(r'[0-9]+','#',replacing(str(row['c']),anondict))})

    table_schema = {
        'fields': [
            {'name': 'c', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'd', 'type': 'STRING', 'mode': 'NULLABLE'}

        ]
    }

    anon | 'WriteToBQ' >> beam.io.WriteToBigQuery(
        dataset+'.result',
        schema= table_schema,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
    )
    
    p.run()

if __name__ == '__main__':
    run()

The question is, how do I turn this pipeline into a templatable one, specially when I'll need to use runtime parameters to define my query and the list of words I wanna redact? When transforming argparse into pipeline options, and transforming them into add_value_provider_argument, it says I cant't concatenate strings and runtime values, which makes sense, but I still need a workaround.

What I have tried already:

import argparse
import logging
import datetime,os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import os



class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument('--project',default='myproject')
        parser.add_argument('--staging_location', default='gs://bucket/staging/')
        parser.add_argument('--temp_location', default='gs://bucket/temp/')
        parser.add_argument('--runner', required=True, default='DataFlowRunner')
        parser.add_argument('--requirements_file', default='./requirements.txt')
        parser.add_argument('--region',  default='us-central1')
        parser.add_argument('--max_num_workers',default='10')
        parser.add_value_provider_argument('--dataset')
        parser.add_value_provider_argument('--dateBegin')
        parser.add_value_provider_argument('--dateEnd')
        parser.add_value_provider_argument('--anondict')

def get_data(dataset,dateBegin,dateEnd):
    """
    given 'DATASET','2020-10-01','2020-10-31'
    returns query for getting data

    """
    query= '''

    SELECT
    DISTINCT
    IFNULL(a,
        b) AS c FROM
    `myproject.'''+dataset+'''.commonTable_'''+dataset+'''`
    WHERE
    date BETWEEN "'''+dateBegin+'''" and "'''+dateEnd+'''"

    '''

    return query


def replacing(item,anondict=[]):
    return re.sub("(?i)"+"|".join(["("+anon+")" for anon in anondict]),"[REDACT]",item)


# Define pipeline runner
def run():

    # Command line arguments
     pipeline_options=PipelineOptions(['--project','myproject',
    '--staging_location', 'gs://bucket/staging/',
    '--temp_location','gs://bucket/temp/',
    '--runner','DataFlowRunner',
    '--requirements_file', './requirements.txt',
    '--region', 'us-central1',
    '--max_num_workers','10'])

    opts = pipeline_options.view_as(UserOptions)


    if opts.anondict==None:
        anondict=[]
    else:
        anondict= opts.anondict.split(',')

    project=opts.project
    bucket=opts.bucket
    dataset=opts.dataset
    dateBegin=opts.dateBegin
    dateEnd=opts.dateEnd

    
    query=get_data(dataset,dateBegin,dateEnd)


    p = beam.Pipeline(argv=argv)

    # Read the table rows into a PCollection (a Python Dictionary)
    
    bq = p | 'GetData' >> beam.io.Read(beam.io.ReadFromBigQuery(project=project,query=query,use_standard_sql=True))

 
    anon = bq | 'Anonymize' >> beam.Map(lambda row: {
        'c':row['c'], 
        'd':re.sub(r'[0-9]+','#',replacing(str(row['c']),anondict))})

    table_schema = {
        'fields': [
            {'name': 'c', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'd', 'type': 'STRING', 'mode': 'NULLABLE'}

        ]
    }

    anon | 'WriteToBQ' >> beam.io.WriteToBigQuery(
        dataset+'.result',
        schema= table_schema,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
    )
    
    p.run()

if __name__ == '__main__':
    run()

References: https://cloud.google.com/dataflow/docs/guides/templates/creating-templates

0

There are 0 answers