I deployed a flink application on a Kind cluster(https://kind.sigs.k8s.io/)(1 master & 2 worker nodes) using a yaml file.
As I want to upload flink checkpoint to a S3 bucket, I manually created testBucket/checkpoints, but I got this error from Job Manager Log
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: ***********; S3 Extended Request ID: ***********; Proxy: null)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879) ~[?:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418) ~[?:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387) ~[?:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157) ~[?:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814) ~[?:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781) ~[?:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755) ~[?:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715) ~[?:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697) ~[?:?]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561) ~[?:?]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541) ~[?:?]
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5456) ~[?:?]
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5403) ~[?:?]
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1372) ~[?:?]
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1346) ~[?:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$5(PrestoS3FileSystem.java:667) ~[?:?]
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139) ~[?:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:664) ~[?:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:648) ~[?:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:353) ~[?:?]
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1760) ~[?:?]
at org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:165) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:148) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:167) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:64) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:329) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1409) ~[flink-dist-1.17.1.jar:1.17.1]
... 7 more
Flink Delpoyment Yaml File
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: sampleDeployment
spec:
image: sampleImage:*.**
flinkVersion: v1_17
flinkConfiguration:
taskmanager.numberOfTaskSlots: "4"
state.backend: filesystem
state.checkpoints.dir: s3://testBucket/checkpoints/
state.backend.fs.checkpointdir: s3://testBucket/checkpoints/
s3.access-key: *********
s3.secret-key: *************
s3.endpoint: https://s3.us-east-1.amazonaws.com
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///***.jar
entryClass: com.***.***
parallelism: 1
upgradeMode: stateless
state: running
I also added flink-s3-fs-hadoop-1.17.1.jar and flink-s3-fs-presto-1.17.1.jar to plugins folder based on this post(Apache Flink to use S3 for backend state and checkpoints).
As I manually created the S3 bucket, it shouldn't be access-key/secret-key issue. Is there any solution to this? As my AWS account has MFA, could this be a potential reason?
There are multiple reasons on why the S3 bucket sends back a 403 error:
DENY
rule that affects the permissions of your credentials.DENY
policies.Remember in IAM, Explicit
DENY
rule takes the highest precedence over all otherALLOW
rule(s).