Currently we have a dataflow job which reads from pubsub and writes avro file using FileIO.writeDynamic to GCS and when we test with say 10000 events/sec , not able to process faster as WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards is very slow. Below is the snippet we are using to write. How can we improve

PCollection<Event> windowedWrites = input.apply("Global Window", Window.<Event>into(new GlobalWindows())
        .triggering(Repeatedly.forever(
            AfterFirst.of(AfterPane.elementCountAtLeast(50000),
                AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DurationUtils
                    .parseDuration(windowDuration))))).discardingFiredPanes());

        return windowedWrites
                        .apply("WriteToAvroGCS", FileIO.<EventDestination, Five9Event>writeDynamic()
                                        .by(groupFn)
                                        .via(outputFn, Contextful.fn(
                                                        new SinkFn()))
                                        .withTempDirectory(avroTempDirectory)
                                        .withDestinationCoder(destinationCoder)
                                        .withNumShards(1).withNaming(namingFn));

We use custom filenaming say in the format, gs://tenantID.<>/eventname/dddd-mm-dd/<uniq_id-shardInder-of-numOfShards-pane-paneIndex.avro>

2

There are 2 answers

0
robertwb On

As mentioned in the comments, the issue is likely withNumShards(1) which forces everything to happen on one worker.

3
Iñigo On

As Robert said, when using withNumShards(1) Dataflow/Beam cannot parallelize the writting, making it happen on the same worker. When the bundles are relatively high, this has a big impact on the performance of the pipeline. I made an example to demonstrate this:

I ran 3 pipelines that generate a lot of elements (~2gb), the three of them with 10 n1-standard-1 workers but with 1 shard, 10 shards and 0 shards (Dataflow would choose the amount of shards). This is how they behave:

Jobs

We see a big difference between 0 or 10 Shard vs 1 Shard total time. If we go to the job with 1 shard, we see that only one worker was doing something (I disabled the autoscaling):

CPU

As Reza mentioned, this happens because all elements need to be shuffled into the same worker so it writes the 1 shard.

Note that my example is Batch, which has a different behavior than Streaming when it comes to threading, but the effect on pipeline performance is similar enough (in fact, in Streaming it may be even worst).

Here you have a Python code so you can test this yourself:

    p = beam.Pipeline(options=pipeline_options)

    def long_string_generator():
        string = "Apache Beam is an open source, unified model for defining " \
                 "both batch and streaming data-parallel processing " \
                 "pipelines. Using one of the open source Beam SDKs, " \
                 "you build a program that defines the pipeline. The pipeline " \
                 "is then executed by one of Beam’s supported distributed " \
                 "processing back-ends, which include Apache Flink, Apache " \
                 "Spark, and Google Cloud Dataflow. "

        word_choice = random.sample(string.split(" "), 20)

        return " ".join(word_choice)

    def generate_elements(element, amount=1):
        return [(element, long_string_generator()) for _ in range(amount)]

    (p | Create(range(1500))
       | beam.FlatMap(generate_elements, amount=10000)
       | WriteToText(known_args.output, num_shards=known_args.shards))

    p.run()