Unable to access PCollection outside with block

58 views Asked by At
for table_name, key_pair in relation_repl_key.items():
  try:
    with beam.Pipeline(options=PipelineOptions()) as p:
      PCollection = p | "Reading from source database" >> relational_db.ReadFromDB(
        source_config=source_config,
        table_name=table_name,
        query="SELECT {} FROM {}".format(
          key_pair["col"],
          table_name
        )
      )
      side_input = bq(p, sideinput_bq_config, table_name, key_pair["repl_key"])
  except RuntimeError:
    pass
  else:
    PCollection | "Selecting updated rows" >> beam.ParDo(
      KeyCheck(), beam.pvalue.AsSingleton(side_input)
    ) 
  finally:
    load(PCollection, table_name, key_pair["primary_key"], key_pair["jsonb_col"])

I'm unable to access the PCollection outside the with block. Running PCollection | beam.Map(print) inside finally: returns nothing.

1

There are 1 answers

0
Kenn Knowles On

A PCollection is scoped to a Pipeline. Outside the with block, the PCollection is no longer a valid reference.