Currently we have a dataflow job which reads from pubsub and writes avro file using FileIO.writeDynamic to GCS and when we test with say 10000 events/sec , not able to process faster as WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards is very slow. Below is the snippet we are using to write. How can we improve
PCollection<Event> windowedWrites = input.apply("Global Window", Window.<Event>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterFirst.of(AfterPane.elementCountAtLeast(50000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DurationUtils
.parseDuration(windowDuration))))).discardingFiredPanes());
return windowedWrites
.apply("WriteToAvroGCS", FileIO.<EventDestination, Five9Event>writeDynamic()
.by(groupFn)
.via(outputFn, Contextful.fn(
new SinkFn()))
.withTempDirectory(avroTempDirectory)
.withDestinationCoder(destinationCoder)
.withNumShards(1).withNaming(namingFn));
We use custom filenaming say in the format, gs://tenantID.<>/eventname/dddd-mm-dd/<uniq_id-shardInder-of-numOfShards-pane-paneIndex.avro>
As mentioned in the comments, the issue is likely
withNumShards(1)
which forces everything to happen on one worker.