Spark structured steaming from kafka - last message processed again after resume from checkpoint

1.4k views Asked by At

I'm using the brand new (and tagged "alpha") Structured Streaming of Spark 2.0.2 to read messages from a kafka topic and update a couple of cassandra tables from it:

val readStream = sparkSession.readStream
  .format("kafka")
  .option("subscribe", "maxwell")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .load
  .as[KafkaMessage]
  .map(<transform KafkaMessage to Company>)

val writeStream = readStream
  .writeStream
  .queryName("CompanyUpdatesInCassandra")
  .foreach(new ForeachWriter[Company] {
    def open(partitionId: Long, version: Long): Boolean = {
      true
    }

    def process(company: Company): Unit = {
      ...
    }

    def close(errorOrNull: Throwable): Unit = {}
  }
  .start
  .awaitTermination

I also configured a checkpoint location ("spark.sql.streaming.checkpointLocation") on the sparkSession. This allows me to pick up messages that arrived while the streaming app was down as soon as it resumes.

However, since configuring this checkpoint location I noticed that at resume it also consistently processes the last message of the previous batch even though it was already processed correctly without failure.

Any idea what I'm doing wrong here? This seems like a very common use case.

More info:

See here the relevant logs (topic 5876 being the last topic that was succesfully processed by the previous batch):

[INFO] 12:44:02.294 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming streaming query, starting with batch 31
[DEBUG] 12:44:02.297 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Found possibly uncommitted offsets {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]}
[DEBUG] 12:44:02.300 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming with committed offsets: {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]}
[DEBUG] 12:44:02.301 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Stream running from {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]} to {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]}
[INFO] 12:44:02.310 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch called with start = Some([(maxwell-0,5876)]), end = [(maxwell-0,5877)]
[INFO] 12:44:02.311 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Partitions added: Map()
[DEBUG] 12:44:02.313 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: TopicPartitions: maxwell-0
[DEBUG] 12:44:02.318 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Sorted executors: 
[INFO] 12:44:02.415 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None)
[DEBUG] 12:44:02.467 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Retrieving data from KafkaSource[Subscribe[maxwell]]: Some([(maxwell-0,5876)]) -> [(maxwell-0,5877)]
[DEBUG] 12:44:09.242 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Creating iterator for KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None)
[INFO] 12:44:09.879 [Executor task launch worker-0] biz.meetmatch.streaming.CompanyUpdateListener$$anon$1: open (partitionId:0, version:31)
[DEBUG] 12:44:09.880 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Get spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 nextOffset -2 requested 5876
[INFO] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Initial fetch for maxwell-0 5876
[DEBUG] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Seeking to spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 5876
[DEBUG] 12:44:10.049 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Polled spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor [maxwell-0]  1

Also, when I kill the stream, I make sure it is stopped gracefully to avoid data loss:

sys.ShutdownHookThread
{
  writeStream.stop
  sparkSession.stop
}
1

There are 1 answers

2
zsxwing On BEST ANSWER

Currently, Structured Streaming checkpoints the states when a new offset is generated. So the case you described is expected, the last committed batch may be reprocessed after recovery. However, that's an internal implementation. Let's say if we do the checkpoint when committing a batch, it's still possible that the checkpointing fails, and your sink, ForeachWriter, also needs to handle this case.

Generally, your sink should always be idempotent.

Updated: in Spark 2.2.0, Structured Streaming doesn't rerun a batch after recovery if it was successful.