We are have an Apache Beam pipeline that is reading messages from a given kafka topic and doing further processing. My pipeline uses the FlinkRunner and I have described three different cases that we have tried:

Case 1: No group id specified:

Beam creates a new consumer for every run and thus reads from the latest topic offset. It reads messages that are produced after the consumer starts. There could be potential data loss in this case during the time interval between stop and restart of the pipeline

Case 2: Group id specified and set enable.auto.commit to true Beam starts re-processing messages from the time the pipeline was stopped and starts reading the messages that were not committed to kafka for the given groupid.

New group id again starts listening to messages from latest topic offset and starts committing messages

.withConsumerConfigUpdates(ImmutableMap.of("enable.auto.commit", true))
.withConsumerConfigUpdates(ImmutableMap.of("group.id", "testGroupId"))

Case 3: Group id specified with commitOffsetsInFinalize()

Ideally I would expect the same behavior as Case 2 here but instead I see behavior similar to Case 1 where there is a potential data loss between stop and restart of a pipeline.

.withConsumerConfigUpdates(ImmutableMap.of("group.id", "testGroupId"))
.commitOffsetsInFinalize()

From the documentation of KafkaIO I do see that offsets are committed back to kafka when checkpoints are finalized as per: https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1098

We would like to understand:

  1. Why Case 2 is not behaving like Case 3 on stopping and re-starting the pipeline?
  2. What are the cases when we should be setting enable.auto.committo true vs commitOffsetsinFinalize?
1

There are 1 answers

0
NB4L1 On