Estimating Watermark for Event Time in Beam

384 views Asked by At

Im trying to use Beam to aggregate over a set of data using event time from the data and Kafka as data source. This works if all my kafka partitions are populated with data. However as soon as a partition has not yet been written to, the watermark cant be estimated and advanced.My TimeStampPolicy is the following:

public class CustomTimeStampPolicy
    extends TimestampPolicy<String, titan.ccp.model.records.ActivePowerRecord> {
  protected Instant currentWatermark;

  public CustomTimeStampPolicy(final Optional<Instant> previousWatermark) {
    this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
  }


  @Override
  public Instant getTimestampForRecord(final PartitionContext ctx,
      final KafkaRecord<String, titan.ccp.model.records.ActivePowerRecord> record) {
    this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
    return this.currentWatermark;
  }

  @Override
  public Instant getWatermark(final PartitionContext ctx) {
    System.out.println("Current Watermark: " + this.currentWatermark);
    return this.currentWatermark;
  }
}

With 3 Kafka partitions with only one populated with data my logs show me these watermarks:

Current Watermark: -290308-12-21T19:59:05.225Z
Current Watermark: 2020-12-09T10:42:29.909Z
Current Watermark: -290308-12-21T19:59:05.225Z

With default triggering my windows wont fire. My guess is that the output watermark is the minimum over the watermarks of the partitions. And therefore wont advance as long as some of my partitions are empty. How can i handle empty partitions with event time processing?

1

There are 1 answers

0
robertwb On

If there is no data written to a Kafka partition, Beam has no way of knowing that once an element is written it won't have a timestamp arbitrarily in the past, hence the very old watermark.

You could try updating your timestamp policy constructor to be previousWatermark.orElse(wallTime - someMaximumSkew)

where someMaximumSkew is the largest delay you could expect to see for data written to kafka. You could also consider taking a min of the previous value (if any) and wallTime - someMaximumSkew to advance when no data has been written for a while.