NameError: name 'beam' is not defined while running 'Create beam Row-ptransform

21 views Asked by At

I am working on a use case where I am reading from PubSub and I want to write aggregated values to bigquery.

Here is my PubSub input which I am writing to that Topic:

b"('B', 'Stream1', 77)"
b"('C', 'Stream3', 11)"

Here is my code:

import json
import os
import typing
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.transforms.sql import SqlTransform

table_spec1 = bigquery.TableReference(
    projectId='phonic-vortex-417406',
    datasetId='training',
    tableId='dflow_agg')


SCHEMA = {
        "fields": [
            {
                "name": 'name',
                "type": "STRING",
                
            },
            {
                "name": 'stream',
                "type": "STRING"
            },
            {
                "name": 'salary',
                "type": "INT64",
                "mode": "NULLABLE"
            }
        ]
    }

pipeline_options = PipelineOptions( streaming=True)

class ProcessWords(beam.DoFn):
  def process(self, ele):
    yield eval(ele)





def run():
    with beam.Pipeline(options=pipeline_options) as p:


        out= (
            p
            | "Read from Pub/Sub subscription" >> beam.io.ReadFromPubSub(subscription="projects/<PROJECT_ID>/subscriptions/Test-sub")
            | "Decode and parse Json" >> beam.Map(lambda element: element.decode("utf-8"))
            |"Formatting " >> beam.ParDo(ProcessWords()) #.with_output_types(CommonLog)
            | "Create beam Row" >> beam.Map(lambda x: beam.Row(Name=str(x[0]),Stream=str(x[1]),Salary=int(x[2])))
            
            |"window" >> beam.WindowInto(beam.window.FixedWindows(30))  
            | SqlTransform(
                """
                    SELECT
                    Name,
                    AVG(Salary) AS avg_sal
                    FROM PCOLLECTION
                    GROUP BY Name
                    
                """)
        # SqlTransform yields python objects with attributes corresponding to
        # the outputs of the query.
        # Collect those attributes, as well as window information, into a dict
        | "Assemble Dictionary" >> beam.Map(
        lambda row,
        window=beam.DoFn.WindowParam: {
            "Name": row.Name,
            "avg_sal": row.avg_sal,
            "window_start": window.start.to_rfc3339(),
            "window_end": window.end.to_rfc3339()
        })



            # | "Write to BigQuery" >> beam.io.WriteToBigQuery(
            #     output_table,
            #     schema=SCHEMA,
            #     create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            #     write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            # )
            # |beam.MapTuple(lambda N,S,M : {"Name":N,"Stream":S,"Marks":M})
            | beam.Map(print)
        )
    

    p.run()

run()

I am running this script using below command"

python3 stream_dflow.py --runner=DataflowRunner --project="<PROJECT_ID>" --region="us-east1" --temp_location="gs://datafllow-stg/staging" --save_main_session=True

But, I am getting this error.

NameError: name 'beam' is not defined [while running 'Create beam Row-ptransform-36']

but, beam is already defined.

Does anyone know what's the issue?

I have tried above all things, but still getting error

1

There are 1 answers

0
AInguva On

For the pipeline options, --save_main_session is a boolean and specifying --save_main_session is sufficient enough. you don't need to pass true(--save_main_session=True).

Let's structure the code to call your run method as

if __name__ == '__main__':
  run()

since beam python pickles main and somehow not running your function run() under main is causing an issue?