Apache Flink losing records when task manager is restarted

734 views Asked by At

I am using Flink cluster with a job manager pod and two task manager pods in a kubernetes cluster. When I submit the streaming job to the job manager it runs the job and I receive the output into the sink. Also I have enabled checkpointing to recover from failure. Now when I intentionally delete one of the task manager pod to verify the node failure handling in flink, I see that some of the records those were suppose to arrive at the sink are not received. When the Pod is automatically restarted by the kubernetes it continues processing the records but it does not recover from the checkpoint. I am using the below command to submit the job

flink run -Dparallelism=2 -m localhost:<port> -c <flink job> -p=2 <flink job>.jar

I have following things in the job env:

 env.enableCheckpointing(10000)
    env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
   
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setStateBackend(new FsStateBackend(Paths.get(<checkpoint path>).toUri, false))

When task manager pod restarts I have the following log.

2020-10-01 10:01:30,096 INFO  org.apache.flink.runtime.blob.BlobClient                     [] - Downloading 2966c462794bf94523e9a53c1d9a2f13/p-421bdbdb924a09ddc017b854d52d9a9457109d43-7307729d1f0408de20cd29e352a2a655 from flinksessioncluster-sample-jobmanager/172.20.225.40:6124

But in the checkpoint directory 2966c462794bf94523e9a53c1d9a2f13 I have only the following items.

chk-299  shared  taskowned

I donot have the directory p-421bdbdb924a09ddc017b854d52d9a9457109d43-7307729d1f0408de20cd29e352a2a655 inside the directory 2966c462794bf94523e9a53c1d9a2f13

According to the docs the task should automatically recover from the checkpointed location.

Please let me know where might be the problem in this.

UPDATE

Actual Test carried out -

Continuously inserting the records into flink job at 't' second interval. When records are getting processed by the task managers I killed one of the task manager pod. At this point in time I stopped inserting records into the flink job. At the input side to job I inserted 1000 records to it. When task manager came up again I had 700 records in the Sink.

Now I started inserting the record one at a time and saw that the records in sink suddenly increased to 940 and thereafter it started incrementing by 1, i.e. The records that were inserted after the task manager crash have started coming to sink. But I lost 60 records from the initial 1000 records those were inserted before the task manager crash

0

There are 0 answers