We deployed a Flink application through Amazon Managed Service for Apache Flink. The overall application performs as expected during runtime. But we faced challenges in maintaining the throughput during upgrades of the Flink application.
Setup:
At a high level, this is a real time streaming application:
Business events will land into the source Kinesis stream
Source: Poll the above mentioned Kinesis Stream with EFO (Enhanced Fan-Out) The Flink App then processes and transforms the data Target Sink: This is a custom sink to a Sagemaker Feature Group (you can think of this as a target database)
A record that runs through the pipeline Kinesis -> Flink -> Sagemaker Feature group (the target database)
will experience a latency of around 100 - 200 ms.
There are downstream applications that expect records to be in the target Sagemake Feature Group within under 1s (after the original event lands in the Kinesis stream).
We scaled up to at least 8 KPUs.
The Issue we faced:
During a new version of Flink jar update (for example, update from version1.jar
to version2.jar
), we will face a downtime of one to two minutes. We use Terraform to update the Flink jar s3 prefix
application_code_configuration {
code_content {
s3_content_location {
bucket_arn = "s3://example_bucket"
file_key = version2.jar <--- update to new jar setup
}
}
The scale up and other portions do not change, only the jar is updated.
We noticed that end-to-end latency of records (we have a monitor setup to monitor latency when a record lands in Kinesis to when it pushes out to the sink) will reach up to 20 seconds or more.
The metric millisbehindLatest
(from https://docs.aws.amazon.com/managed-flink/latest/java/metrics-dimensions.html) will also jump to like 10 - 20 seconds or more. Indicate flink is not behind latest kinesis stream.
Eventually after 1 - 2 minutes, both records' average latency and millisbehindLatest
will back down to normal speed.
But downstream applications rely on target database (Sagemaker Feature groups) to be populated in time will suffer through the latency.
Our understanding is it is due to flink is doing reload + redistribute of kinesis partitions + reload from the checkpoint. So no data is lost (as data is load from checkpoint + last kinesis stream position).
Our questions:
AWS Support mentioned that "it is normal" for flink ingestion/outgoing to be "slower" during jar update like we saw above. But cannot provide what is an "average disruption time" (is that 1 - 2 min normal)?
So wonder if other users of Amazon Managed Service for Apache Flink faced similar issue? And what will be the recommendation to reduce it?
We are thinking internally that:
Our application nature can suffer data output to target database (Sagemaker Feature group) to contain old data/new data for some period
During upgrade, we will
- fire up a new flink application in parallel to old flink app during update (blue/green)
- After new application is up, will then terminate old flink app (or just stop it, to be used as next "green" in future update)
This is more complex, but will reduce the downtime from updating.
Our application is able to suffer two flink app write to same target (latest record will override previously written one) + possible to use older flink app is the latest record.
But wondering if others have better ideas to do it.
Thanks!
You can reduce the amount of time required to "catch up" after a new jar deployment by decreasing your checkpoint interval. But I don't see a way to keep latency to under 1 second during this process, given that AWS has to tear down (with savepoint) the current Flink job, then re-deploy (from savepoint).
So I think your proposal for running a second Flink job is likely the best option. You will need to be careful about starting that job from a savepoint taken from the first job, of course, to ensure consistent (duplicated) results.