Source is kafka for our beam pipeline. Apache beam's kafka IO connector supports moving of watermark(in case of flink runner) even if any partition is idle. The applications who would want to process packets based on the timestamp of the packet which is included in the payload would want to use "CustomTimestampPolicyWithLimitedDelay". We use FIXED WINDOWS for a minute for aggregation which is dependent on notion of time. So if time does not advance properly aggregation function is not called and data is missed.
This API has functionality issues. So when the application is initialized , let us just for example Topic a is used as a source with three partitions. These steps were taken to reproduce the issue:
- Pump data to only one partition with a frequency with of any x seconds and observation is aggregation function is not called even after several minutes.
- Now pump data to all partitions and observation is aggregation function is called at end of minute as expected.
- Now pump data to only one partition and that too not till end of minute just before that so that we can generate a idle partition scenario and observation is it works as expected NOW.
So the sort of summary is there is a initialization issue with this api where it does not advance time but after step 2 it stabilizes and works as expected.
This is easily reproducible and would request apache beam to fix this.
As of now the temp fix we have gone is with LogAppendTime which works flawlessly but we do not want to process packets on broker time due to various application needs.