Apache Beam Dataflow Properly Sharding and Mapping Messages from PubSub to GCS Efficiently

452 views Asked by At

I'm having an issue with Dataflow where Dataflow seems to lag out super hard.

I'm ingesting hundreds of thousands of events a second in pub/sub then using a dataflow job to process it.

For each message, I'm processing it and assigning it a key (there's under 10 keys) using an example. I don't think I need the last couple of steps.

def expand(self, pcoll):
        return (
                pcoll
                # Assigns window info to each Pub/Sub message based on its
                # publish timestamp.
                | "Window into Fixed Intervals"
                >> beam.WindowInto(window.FixedWindows(self.window_size))
                | "Output Event Str obj for each message" >> beam.ParDo(ProcessEvent())
                
                | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
                | "Groupby" >> beam.GroupByKey()
                | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
        )

Then I'm doing a windowed write to GCSIO to a file for each key every minute.

The issue is the last writeBatchesToGCS takes all the data and writes it line by line in the last step. Obviously this isn't optimal because it's basically pumping all the windowed data into one worker to handle the job of actually putting it into GCS which is causing a massive slowdown in the job execution.

with beam.io.gcp.gcsio.GcsIO().open(filename=a_file, mode="w") as a_f \
     beam.io.gcp.gcsio.GcsIO().open(filename=quote_file, mode="w") as q_f, \:
  for element in batch:
    try:
      output = element['message_body']
      if 'A' in output:
        a_f.write(output['A'])
      if 'B' in output:
        q_f.write(output['B'])
      .....

Obviously this isn't great code but not familiar with dataflow. Any thoughts?

1

There are 1 answers

2
Reza Rokni On

In the first section (before the write), having 10 keys means parallelism in that stage will be limited to 10. If possible its worth looking at using a CombinePerKey. Also worth exploring the use of CombinePerKey.with_hot_key_fanout if using a combiner is an option.

For the output, when possible make use of textio when out putting text files. The num_shards property will need to be set in stream mode, but assuming its ok for the output files to be in format _1_of_x... then this can help with the parallelism for the output. You can branch the pipeline with a Multi-output transform to send different records to different files.

[Edit based on comments] If the keys are just a way to send the elements to a specific file, then have a look at fileio-dynamic-destinations