How to create a beam template with current date as an input (updated daily) [Create from GET request]

465 views Asked by At

I am trying to create a Dataflow job run daily with Cloud Scheduler. I need to get the data from an external API using GET requests, so I need the current date as an input. However, when I export the dataflow job as a template for scheduling, the date input stays at execution time and not updated daily. I have been looking around for a solution, and come across the ValueProvider, but my pipeline, stating with apache_beam.transforms.Create always return an error 'RuntimeValueProvider(option: test, type: str, default_value: 'killme').get() not called from a runtime context' when the ValueProvider is not specified.

Is there anyway I can overcome this? It seems like such a simple problem, yet I cannot make it work no matter how. I appreciate a lot if there is any idea!!

1

There are 1 answers

1
Reza Rokni On BEST ANSWER

You can use the ValueProvider interface to pass runtime parameters to your pipeline, to access it within a DoFn you will need to pass it in as parameter. Similar to the following example from here:

https://beam.apache.org/documentation/patterns/pipeline-options/#retroactively-logging-runtime-parameters

class LogValueProvidersFn(beam.DoFn):
  def __init__(self, string_vp):
    self.string_vp = string_vp

  # Define the DoFn that logs the ValueProvider value.
  # The DoFn is called when creating the pipeline branch.
  # This example logs the ValueProvider value, but
  # you could store it by pushing it to an external database.
  def process(self, an_int):
    logging.info('The string_value is %s' % self.string_vp.get())
    # Another option (where you don't need to pass the value at all) is:
    logging.info(
        'The string value is %s' %
        RuntimeValueProvider.get_value('string_value', str, ''))

  | beam.Create([None])
  | 'LogValueProvs' >> beam.ParDo(
      LogValueProvidersFn(my_options.string_value)))

You may also want to have a look at Flex templates :

https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates