I'm developing a Dataflow streaming job for CSV check triggered by a creation of an object in Cloud Storage (via Pub\Sub notification).

I'm using Dataflow because is a business requirement and for the message de-duplication management (could be possible with Pub\Sub).

In each pipeline step I make a particularry type of control (the check rules are defined in a Google Sheet that I read with a step that I've created in the pipeline). If all steps are ok, copy the file in onother bucket, otherwise send an email of error. For these reason I need a global function to call possibly in all steps.

I've declaired the function afer the library invokation:

from email import header
from hashlib import new
from msilib.schema import Error
import json
import apache_beam as beam
from apache_beam import pvalue, window, GroupByKey
from datetime import datetime

import logging, uuid

from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions

TOPIC = "TOPIC_PATH"

def test():
    # send an email
    print("Send email")


class ReadPubSubMessage(beam.DoFn):
    def __init__(self):
        self.prod_bucket = "prd-v2"
        self.invalid_bucket = "prd-error"

    def process(self, element, *args, **kwargs):
        import uuid, json
        from datetime import datetime

        # Creating a uuid for the ingested file identification 
        try:
            uuidOne = uuid.uuid1()
            logging.info("Reading PubSub message")

            # Reading the PubSub json end extracting main information
            res = json.loads(element)
            path_loaded_blob = res["id"]
            type_object = res["contentType"]
            
            # Getting the date from the blob path
            list_of_path_parts = path_loaded_blob.split("/")
            
            . # other code
            .
            .

            yield final_list
        except Exception as e:
            test(email)
            logging.error("Error: " + str(e))


beam_options = PipelineOptions()
google_cloud_options = beam_options.view_as(GoogleCloudOptions)
with beam.Pipeline(options=beam_options) as pipeline:
    
    check_csv = (pipeline 
        | "ReadData" >> beam.io.ReadFromPubSub(topic=TOPIC) # Ok 
        | "Decode" >> beam.Map(lambda x: x.decode('utf-8')) # Ok 
        | "Extract informations from PubSub message" >> beam.ParDo(ReadPubSubMessage()) # Ok 
        .
        .
        .
        | beam.Map(lambda x:logging.info(x))
    )

The error that i receive is:

NameError: name 'external_functions' is not defined

I think it's because workers have not the code scope but only the task code.

How can i write a global function in a Streaming Job Dataflow? Or share a basic example using a global function in more tasks in Dataflow?

Thank you for the time

I create a little snippet of code for simulate the situation. I've create another python file containing a function that i call (after making an import of the lib) but i've the same error.

I've tried also to define the function inside the main but obviously doesn't work.

main.py below

import apache_beam as beam

import logging
# import library_list as external_functions
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions

# class stepGeneral(beam.DoFn):
#     def process(self, element):
#         variable = "External function"
#         logging.info("Called general method")
#         yield variable

TOPIC = "TOPIC NAME"

class step1(beam.DoFn):
    def process(self, element):
        variable = "first"
        logging.info("This is the " + variable + " step")
        yield variable

class step2(beam.DoFn):
    def process(self, element):
        variable = "second"
        logging.info("This is the " + variable + " step")
        # stepGeneral.process()
        external_functions.stepGeneral()
        yield variable

beam_options = PipelineOptions()
google_cloud_options = beam_options.view_as(GoogleCloudOptions)
with beam.Pipeline(options=beam_options) as pipeline:
    
    (pipeline
        | "ReadData" >> beam.io.ReadFromPubSub(topic=TOPIC) # Ok 
        | "First step" >> beam.ParDo(step1())
        | "Second step" >> beam.ParDo(step2())
        # | "window" >> beam.WindowInto(beam.window.FixedWindows(1)) 
        | beam.Map(lambda x:logging.info(x))
    )

And below the library_list.py

import logging

def stepGeneral():
    variable = "External function"
    logging.info("Called general method")
    yield variable
2

There are 2 answers

1
elrorris On

As your code starts growing beyond one-single-file (i.e. adding classes), you need to pay more attention into the building and submitting the container.

Pretty much you need to start building your own custom container (https://cloud.google.com/dataflow/docs/guides/using-custom-containers) that inherits from the python+apachebeam container you want, meaning that your Dockerfile will have to start with:

FROM apache/beam_python_sdk:

i.e. FROM apache/beam_python3.8_sdk:2.38.0

When using custom containers, you need to re-run the gcloud builds command to every time you add a new file, to ensure your file makes it into the container.

gcloud --project my-google-project builds submit -t THEIMAGE . --timeout 20m *THEIMAGE is the image location to use in gcr.io. (you can provide a config.yaml file instead of the longer command as in the satellites tutorial)

The satellites tutorial is, in my opinion, quite illustrative of the whole process: https://cloud.google.com/dataflow/docs/tutorials/satellite-images-gpus

I hope this helps.

1
robertwb On

See https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/ for how to ship and manage pipeline dependencies for your Python code. You could consider using cloudpickle which may resolve some issues as well (see https://beam.apache.org/blog/beam-2.36.0/).