I am able to run a custom flow whenever I call the .py file, and using arguments provided by the argparse library. However, when I try to transform my arguments to runtime arguments, it doesn't work. Here is a sample of the code as a standalone pipe:
import argparse
import logging
import datetime,os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import os
def get_data(dataset,dateBegin,dateEnd):
"""
given 'DATASET','2020-10-01','2020-10-31'
returns query for getting data
"""
query= '''
SELECT
DISTINCT
IFNULL(a,
b) AS c FROM
`myproject.'''+dataset+'''.commonTable_'''+dataset+'''`
WHERE
date BETWEEN "'''+dateBegin+'''" and "'''+dateEnd+'''"
'''
return query
def replacing(item,anondict=[]):
return re.sub("(?i)"+"|".join(["("+anon+")" for anon in anondict]),"[REDACT]",item)
# Define pipeline runner
def run():
# Command line arguments
parser = argparse.ArgumentParser(description='Run the flow')
parser.add_argument('--project', required=True, default='myproject')
parser.add_argument('--bucket', required=True, default='abucket')
parser.add_argument('--dataset', required=True)
parser.add_argument('--dateBegin', required=True)
parser.add_argument('--dateEnd', required=True)
parser.add_argument('--anondict')
opts = parser.parse_args()
if opts.anondict==None:
anondict=[]
else:
anondict= opts.anondict.split(',')
project=opts.project
bucket=opts.bucket
dataset=opts.dataset
dateBegin=opts.dateBegin
dateEnd=opts.dateEnd
query=get_data(dataset,dateBegin,dateEnd)
argv = [
'--project={0}'.format(project),
'--job_name=flow',
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(bucket),
'--temp_location=gs://{0}/staging/'.format(bucket),
'--runner=DataFlowRunner',
'--requirements_file=./requirements.txt',
'--region=us-central1',
'--max_num_workers=10'
]
p = beam.Pipeline(argv=argv)
# Read the table rows into a PCollection (a Python Dictionary)
bq = p | 'GetData' >> beam.io.Read(beam.io.ReadFromBigQuery(project=project,query=query,use_standard_sql=True))
anon = bq | 'Anonymize' >> beam.Map(lambda row: {
'c':row['c'],
'd':re.sub(r'[0-9]+','#',replacing(str(row['c']),anondict))})
table_schema = {
'fields': [
{'name': 'c', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'd', 'type': 'STRING', 'mode': 'NULLABLE'}
]
}
anon | 'WriteToBQ' >> beam.io.WriteToBigQuery(
dataset+'.result',
schema= table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
p.run()
if __name__ == '__main__':
run()
The question is, how do I turn this pipeline into a templatable one, specially when I'll need to use runtime parameters to define my query and the list of words I wanna redact? When transforming argparse into pipeline options, and transforming them into add_value_provider_argument, it says I cant't concatenate strings and runtime values, which makes sense, but I still need a workaround.
What I have tried already:
import argparse
import logging
import datetime,os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import os
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--project',default='myproject')
parser.add_argument('--staging_location', default='gs://bucket/staging/')
parser.add_argument('--temp_location', default='gs://bucket/temp/')
parser.add_argument('--runner', required=True, default='DataFlowRunner')
parser.add_argument('--requirements_file', default='./requirements.txt')
parser.add_argument('--region', default='us-central1')
parser.add_argument('--max_num_workers',default='10')
parser.add_value_provider_argument('--dataset')
parser.add_value_provider_argument('--dateBegin')
parser.add_value_provider_argument('--dateEnd')
parser.add_value_provider_argument('--anondict')
def get_data(dataset,dateBegin,dateEnd):
"""
given 'DATASET','2020-10-01','2020-10-31'
returns query for getting data
"""
query= '''
SELECT
DISTINCT
IFNULL(a,
b) AS c FROM
`myproject.'''+dataset+'''.commonTable_'''+dataset+'''`
WHERE
date BETWEEN "'''+dateBegin+'''" and "'''+dateEnd+'''"
'''
return query
def replacing(item,anondict=[]):
return re.sub("(?i)"+"|".join(["("+anon+")" for anon in anondict]),"[REDACT]",item)
# Define pipeline runner
def run():
# Command line arguments
pipeline_options=PipelineOptions(['--project','myproject',
'--staging_location', 'gs://bucket/staging/',
'--temp_location','gs://bucket/temp/',
'--runner','DataFlowRunner',
'--requirements_file', './requirements.txt',
'--region', 'us-central1',
'--max_num_workers','10'])
opts = pipeline_options.view_as(UserOptions)
if opts.anondict==None:
anondict=[]
else:
anondict= opts.anondict.split(',')
project=opts.project
bucket=opts.bucket
dataset=opts.dataset
dateBegin=opts.dateBegin
dateEnd=opts.dateEnd
query=get_data(dataset,dateBegin,dateEnd)
p = beam.Pipeline(argv=argv)
# Read the table rows into a PCollection (a Python Dictionary)
bq = p | 'GetData' >> beam.io.Read(beam.io.ReadFromBigQuery(project=project,query=query,use_standard_sql=True))
anon = bq | 'Anonymize' >> beam.Map(lambda row: {
'c':row['c'],
'd':re.sub(r'[0-9]+','#',replacing(str(row['c']),anondict))})
table_schema = {
'fields': [
{'name': 'c', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'd', 'type': 'STRING', 'mode': 'NULLABLE'}
]
}
anon | 'WriteToBQ' >> beam.io.WriteToBigQuery(
dataset+'.result',
schema= table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
p.run()
if __name__ == '__main__':
run()
References: https://cloud.google.com/dataflow/docs/guides/templates/creating-templates