Throttling with Beam Timely and Stateful Processing

39 views Asked by At

I’m working on an Apache Beam pipeline (batch) in Python, where I read a large volume of elements from BigQuery from multiple tables and write them to external databases (e.g. MongoDB). I want to ensure that the batches are flushed at regular intervals without overloading the external service. I plan to dynamically determine batch size and intervals based on external metrics (e.g. Disk IOPS). I'd like to implement some kind of throttling/rate-limiting mechanism.

I want to avoid flushing the batch before a set interval (e.g., 1 or 2 seconds), even if the maximum batch size is reached earlier. However, once the interval is reached, I need to ensure the batch doesn't exceed the maximum size. The standard GroupIntoBatches in Beam doesn't quite fit my needs as it doesn't allow for dynamic control.

Its implementation:

    class _GroupIntoBatchesDoFn(DoFn):
  def process(
      self,
      element,
      window=DoFn.WindowParam,
      element_state=DoFn.StateParam(ELEMENT_STATE),
      count_state=DoFn.StateParam(COUNT_STATE),
      window_timer=DoFn.TimerParam(WINDOW_TIMER),
      buffering_timer=DoFn.TimerParam(BUFFERING_TIMER)):
    # Allowed lateness not supported in Python SDK
    # https://beam.apache.org/documentation/programming-guide/#watermarks-and-late-data
    window_timer.set(window.end)
    element_state.add(element)
    count_state.add(1)
    count = count_state.read()
    if count == 1 and max_buffering_duration_secs > 0:
      # This is the first element in batch. Start counting buffering time if a
      # limit was set.
      # pylint: disable=deprecated-method
      buffering_timer.set(clock() + max_buffering_duration_secs)
    if count >= batch_size:
      return self.flush_batch(element_state, count_state, buffering_timer)

I'm concerned that, due to the large volume of data read from BigQuery, it might lead to more frequent batch flushes than intended intervals and therefore throttling will not be achieved.

I could probably use simple sleep, but I'd like to know if there is a better approach. Thanks!

0

There are 0 answers