Im trying to use Beam to aggregate over a set of data using event time from the data and Kafka as data source. This works if all my kafka partitions are populated with data. However as soon as a partition has not yet been written to, the watermark cant be estimated and advanced.My TimeStampPolicy is the following:
public class CustomTimeStampPolicy
extends TimestampPolicy<String, titan.ccp.model.records.ActivePowerRecord> {
protected Instant currentWatermark;
public CustomTimeStampPolicy(final Optional<Instant> previousWatermark) {
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@Override
public Instant getTimestampForRecord(final PartitionContext ctx,
final KafkaRecord<String, titan.ccp.model.records.ActivePowerRecord> record) {
this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
return this.currentWatermark;
}
@Override
public Instant getWatermark(final PartitionContext ctx) {
System.out.println("Current Watermark: " + this.currentWatermark);
return this.currentWatermark;
}
}
With 3 Kafka partitions with only one populated with data my logs show me these watermarks:
Current Watermark: -290308-12-21T19:59:05.225Z
Current Watermark: 2020-12-09T10:42:29.909Z
Current Watermark: -290308-12-21T19:59:05.225Z
With default triggering my windows wont fire. My guess is that the output watermark is the minimum over the watermarks of the partitions. And therefore wont advance as long as some of my partitions are empty. How can i handle empty partitions with event time processing?
If there is no data written to a Kafka partition, Beam has no way of knowing that once an element is written it won't have a timestamp arbitrarily in the past, hence the very old watermark.
You could try updating your timestamp policy constructor to be
previousWatermark.orElse(wallTime - someMaximumSkew)
where
someMaximumSkew
is the largest delay you could expect to see for data written to kafka. You could also consider taking a min of the previous value (if any) andwallTime - someMaximumSkew
to advance when no data has been written for a while.