I have developed a sample beam pipeline in Python, that receives some data from a pubsub subscription (the data element is the name of a person with it's age, the objective is to count how many persons over certain age are inside a fixedwindow).
The FixedWindow is set to 30 seconds, without additional configuration.
The issue is that the output is triggered randomly, after the first output the pipeline starts emitting output data (like 5 or 6 outputs) between the current window and the next one that should emit a result after 60 seconds.
with beam.Pipeline(options=pipeline_options) as p:
data = p | ReadFromPubSub(topic=known_args.input, with_attributes=True, timestamp_attribute="timestamp")
transformed = (data
| 'FormatMessage' >> beam.Map(format_message)
| 'Add Timestamp: %s' >> beam.ParDo(AddTimestampDoFn())
| beam.WindowInto(window.FixedWindows(30))
| "Filter" >> beam.Filter(filter_names, known_args.rules)
| "ReMap" >> beam.Map(lambda x: (x['data']))
| beam.ParDo(CollectTimings())
| 'Group' >> beam.GroupByKey()
| 'Count' >> beam.CombineValues(beam.combiners.CountCombineFn())
)
serialized = (transformed
| beam.Map(lambda x: json.dumps(x))
| beam.Map(printresults)
)
serialized | "Write To PubSub" >> WriteStringsToPubSub(known_args.output)
From what I understand, based on Beam documentation, I should be receiving outputs (if there is at least one input data) every 30 seconds, but I'm getting an multiple outputs inside the window.
What can be the cause of this behaviour?