for table_name, key_pair in relation_repl_key.items():
try:
with beam.Pipeline(options=PipelineOptions()) as p:
PCollection = p | "Reading from source database" >> relational_db.ReadFromDB(
source_config=source_config,
table_name=table_name,
query="SELECT {} FROM {}".format(
key_pair["col"],
table_name
)
)
side_input = bq(p, sideinput_bq_config, table_name, key_pair["repl_key"])
except RuntimeError:
pass
else:
PCollection | "Selecting updated rows" >> beam.ParDo(
KeyCheck(), beam.pvalue.AsSingleton(side_input)
)
finally:
load(PCollection, table_name, key_pair["primary_key"], key_pair["jsonb_col"])
I'm unable to access the PCollection
outside the with
block. Running PCollection | beam.Map(print)
inside finally:
returns nothing.
A PCollection is scoped to a Pipeline. Outside the
with
block, the PCollection is no longer a valid reference.