Flink: "Unable to Shutdown Event Loop" when stopping with savepoint on Managed Apache Flink

163 views Asked by At

I'm running a Flink (1.15) application on AWS Managed Service for Apache Flink. It's a relatively simple job topology with a single Kinesis source that utilizes EFO, a map and filter operation, and then sinking to DynamoDB. Anytime changes are deployed to the app, the app remains stuck in "Updating" state with the following errors:

"locationInformation": "org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyClientLogger.error(NettyClientLogger.java:100)",
"logger": "org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient",
"message": "Unable to close channel pools",
"messageSchemaVersion": "1",
"messageType": "ERROR",
"threadName": "Source: XXXX -> Filter -> XXXX -> Sink: XXXX (7/36)#6",
"throwableInformation": "java.lang.RuntimeException: java.util.concurrent.TimeoutException\n\tat org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.AwaitCloseChannelPoolMap.close(AwaitCloseChannelPoolMap.java:175)\n\tat org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.runAndLogError(NettyUtils.java:377)\n\tat org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.close(NettyNioAsyncHttpClient.java:197)\n\tat org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.close(KinesisProxyV2.java:91)\n\tat org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherFactory.close(FanOutRecordPublisherFactory.java:97)\n\tat org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.closeRecordPublisherFactory(KinesisDataFetcher.java:843)\n\tat org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:817)\n\tat org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:410)\n\tat org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)\n\tat org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)\n\tat org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)\n\tat org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)\n\tat org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)\n\tat org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)\n\tat org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)\n\tat org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)\n\tat org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)\n\tat org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)\n\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.util.concurrent.TimeoutException\n\tat java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)\n\tat java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)\n\tat org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.AwaitCloseChannelPoolMap.close(AwaitCloseChannelPoolMap.java:170)\n\t... 22 more\n"

}`

After doing some digging in the logs, I'm seeing errors that to my inexperienced eyes look like the connection to the Kinesis stream is being severed before the app manages to gracefully shutdown:

{ "locationInformation": "org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.handleError(FanOutShardSubscriber.java:275)", "logger": "org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber", "message": "Error occurred on EFO subscription: org.apache.flink.kinesis.shaded.io.netty.channel.StacklessClosedChannelException - (null). shardId-000000000129 (arn:aws:kinesis:XXXX:XXXXX:stream/XXXXX/consumer/XXXXX)", "messageSchemaVersion": "1", "messageType": "WARN", "threadName": "shardConsumers-Source: XXXX -> Filter -> KXXXX -> Sink: XXXXX (18/36)#6-thread-0", "throwableInformation": "org.apache.flink.kinesis.shaded.io.netty.channel.StacklessClosedChannelException\n\tat org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)\n" }

Which, to my best guess, leads to a checkpoint failure thrown here: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed. at org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1013) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task name with subtask : Source: XXX -> Filter -> XXXX -> Sink: XXX (4/36)#16 Failure reason: Task has failed. at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1395) at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1338) at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343) Caused by: java.util.concurrent.CompletionException: java.util.concurrent.RejectedExecutionException: event executor terminated at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063) ... 3 more Caused by: java.util.concurrent.RejectedExecutionException: event executor terminated at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923) //CUTOFF//

Reading through the logs is a bit of a mess as its hard to tell what's relevant to this issue, but hopefully I've included enough. Any pointers would be greatly appreciated.

0

There are 0 answers