Flink two phase commit for map function to implement exactly-once semantics

893 views Asked by At

Background:

We have a Flink pipeline which consists of multiple sources, multiple sinks and multiple operators along the pipeline which also update databases.

For the sake of the question and to make it simpler let's assume we have a pipeline which looks like so:

Source -> KeyBy -> FlatMap -> Filter -> Sink

This pipeline supposed to allow us to listen to notifications regarding changes in some data. (Each notification contains an ID) For each ,notification we read data from the DB, run an algorithm and update the same DB row. After that we also emit the magnitude of the change of the data. Only if the data change magnitude is large enough we emit a notification to another Kafka topic.

  • The Source subscribes to Kafka topic to listen for the notifications on the changed data IDs.
  • The KeyBy is keying by the ID to make sure the same ID is not processed by 2 instances of the operators at the same time.
  • Given the ID, the FlatMap reads the data from the DB, runs an algorithm and updates the same DB row. It emits the change magnitude. It is a FlatMap and not a Map because in some cases we don't want to emit any change magnitude, for example if we had some specific errors.
  • The Filter filters the stream for magnitudes less then some threshold
  • The Sink is sending the filtered notifications to another Kafka topic.

Question:

We want to run the pipeline with exactly-once semantics. From what we see, Flink supports exactly-once semantics for Kafka sources, for Kafka sinks and for stateful or stateles operators in the middle. We couldn't find any place explaining how to do an exactly once with resources you update along the pipeline. There is a TwoPhaseCommitSinkFunction that allows to create a sink function that allows the exactly-once semantics.

We cannot use it because we want to update the database and after that emit a change notification to Kafka. Doing it in 2 separate sinks will create race conditions where we can receive a magnitude notification before the DB is actually updated.

Are we missing something? Is there a way to implement 2 phase commits in Map/FlatMap operators? Is there another solution?

Thanks!

0

There are 0 answers