I’m working on an Apache Beam pipeline (batch) in Python, where I read a large volume of elements from BigQuery from multiple tables and write them to external databases (e.g. MongoDB). I want to ensure that the batches are flushed at regular intervals without overloading the external service. I plan to dynamically determine batch size and intervals based on external metrics (e.g. Disk IOPS). I'd like to implement some kind of throttling/rate-limiting mechanism.
I want to avoid flushing the batch before a set interval (e.g., 1 or 2 seconds), even if the maximum batch size is reached earlier. However, once the interval is reached, I need to ensure the batch doesn't exceed the maximum size. The standard GroupIntoBatches in Beam doesn't quite fit my needs as it doesn't allow for dynamic control.
Its implementation:
class _GroupIntoBatchesDoFn(DoFn):
def process(
self,
element,
window=DoFn.WindowParam,
element_state=DoFn.StateParam(ELEMENT_STATE),
count_state=DoFn.StateParam(COUNT_STATE),
window_timer=DoFn.TimerParam(WINDOW_TIMER),
buffering_timer=DoFn.TimerParam(BUFFERING_TIMER)):
# Allowed lateness not supported in Python SDK
# https://beam.apache.org/documentation/programming-guide/#watermarks-and-late-data
window_timer.set(window.end)
element_state.add(element)
count_state.add(1)
count = count_state.read()
if count == 1 and max_buffering_duration_secs > 0:
# This is the first element in batch. Start counting buffering time if a
# limit was set.
# pylint: disable=deprecated-method
buffering_timer.set(clock() + max_buffering_duration_secs)
if count >= batch_size:
return self.flush_batch(element_state, count_state, buffering_timer)
I'm concerned that, due to the large volume of data read from BigQuery, it might lead to more frequent batch flushes than intended intervals and therefore throttling will not be achieved.
I could probably use simple sleep, but I'd like to know if there is a better approach. Thanks!