Apache Flink - streaming app doesn't start from checkpoint after stop and start

1.1k views Asked by At

I have the following Flink streaming application running locally, written with the SQL API:

object StreamingKafkaJsonsToCsvLocalFs {

  val brokers = "localhost:9092"
  val topic = "test-topic"
  val consumerGroupId = "test-consumer"
  val kafkaTableName = "KafKaTable"
  val targetTable = "TargetCsv"
  val targetPath = f"file://${new java.io.File(".").getCanonicalPath}/kafka-to-fs-csv"

  def generateKafkaTableDDL(): String = {
    s"""
       |CREATE TABLE $kafkaTableName (
       |  `kafka_offset` BIGINT METADATA FROM 'offset',
       |  `seller_id` STRING
       |) WITH (
       |  'connector' = 'kafka',
       |  'topic' = '$topic',
       |  'properties.bootstrap.servers' = 'localhost:9092',
       |  'properties.group.id' = '$consumerGroupId',
       |  'scan.startup.mode' = 'earliest-offset',
       |  'format' = 'json'
       |)
       |""".stripMargin
  }

  def generateTargetTableDDL(): String = {
    s"""
       |CREATE TABLE $targetTable (
       |  `kafka_offset` BIGINT,
       |  `seller_id` STRING
       |  )
       |WITH (
       |  'connector' = 'filesystem',
       |  'path' = '$targetPath',
       |  'format' = 'csv',
       |  'sink.rolling-policy.rollover-interval' = '10 seconds',
       |  'sink.rolling-policy.check-interval' = '1 seconds'
       |)
       |""".stripMargin
  }

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    env.enableCheckpointing(1000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")

    val settings = EnvironmentSettings.newInstance()
      .inStreamingMode()
      .build()

    val tblEnv = StreamTableEnvironment.create(env, settings)

    tblEnv.executeSql(generateKafkaTableDDL())
    tblEnv.executeSql(generateTargetTableDDL())

    tblEnv.from(kafkaTableName).executeInsert(targetTable).await()
    tblEnv.executeSql("kafka-json-to-fs")
  }
}

As you can see, the checkpointing is enabled and when I execute this application I see that the checkpoint folder is created and populated.

The problem that I am facing with is -- when I stop&start my application (from the IDE) I expect it to start from the same point it stopped in the previous execution but instead I see that it consumes all the offsets from the earliest offset in the topic (I see it from the new generated output files that contain zero offset although the previous run processed those offsets).

What am I missing about checkpointing in Flink? I would expect it to be exactly once.

1

There are 1 answers

5
David Anderson On BEST ANSWER

Flink only restarts from a checkpoint when recovering from a failure, or when explicitly restarted from a retained checkpoint via the command line or REST API. Otherwise, the KafkaSource starts from the offsets configured in the code, which defaults to the earliest offsets.

If you have no other state, you could instead rely on the committed offsets as the source of truth, and configure the Kafka connector to use the committed offsets as the starting position.


Flink's fault tolerance via checkpointing isn't designed to support mini-cluster deployments like the one used when running in an IDE. Normally the job manager and task managers are running in separate processes, and the job manager can detect that a task manager has failed, and can arrange for a restart.