I am trying to deploy a production cluster for my Flink program. I am using a standard hadoop-core EMR cluster with Flink 1.3.2 installed, using YARN to run it.
I am trying to configure my RocksDB to write my checkpoints to an S3 bucket. I am trying to go through these docs: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#set-s3-filesystem. The problem seems to be getting the dependencies working correctly. I receive this error when trying run the program:
java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:93)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:328)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:350)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:282)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:273
I have tried both leaving and adjusting the core-site.xml and leaving it as is. I have tried setting the HADOOP_CLASSPATH
to the /usr/lib/hadoop/share
that contains(what I assume are) most of the JARs described in the above guide. I tried downloading the hadoop 2.7.2 binaries, and copying over them into the flink/libs directory. All resulting in the same error.
Has anyone successfully gotten Flink being able to write to S3 on EMR?
EDIT: My cluster setup
AWS Portal:
1) EMR -> Create Cluster
2) Advanced Options
3) Release = emr-5.8.0
4) Only select Hadoop 2.7.3
5) Next -> Next -> Next -> Create Cluster ( I do fill out names/keys/etc)
Once the cluster is up I ssh into the Master and do the following:
1 wget http://apache.claz.org/flink/flink-1.3.2/flink-1.3.2-bin-hadoop27-scala_2.11.tgz
2 tar -xzf flink-1.3.2-bin-hadoop27-scala_2.11.tgz
3 cd flink-1.3.2
4 ./bin/yarn-session.sh -n 2 -tm 5120 -s 4 -d
5 Change conf/flink-conf.yaml
6 ./bin/flink run -m yarn-cluster -yn 1 ~/flink-consumer.jar
My conf/flink-conf.yaml I add the following fields:
state.backend: rocksdb
state.backend.fs.checkpointdir: s3:/bucket/location
state.checkpoints.dir: s3:/bucket/location
My program's checkpointing setup:
env.enableCheckpointing(getCheckpointRate,CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(getCheckpointMinPause)
env.getCheckpointConfig.setCheckpointTimeout(getCheckpointTimeout)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.setStateBackend(new RocksDBStateBackend("s3://bucket/location", true))
If there are any steps you think I am missing, please let me know
I assume that you installed Flink
1.3.2
on your own on the EMR Yarn cluster, because Amazon does not yet offer Flink1.3.2
, right?Given that, it seems as if you have a dependency conflict. The method
org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration)
was only introduced with Hadoop2.4.0
. Therefore I assume that you have deployed a Flink1.3.2
version which was built with Hadoop2.3.0
. Please deploy a Flink version which was built with the Hadoop version running on EMR. This will most likely solve all dependency conflicts.Putting the Hadoop dependencies into the
lib
folder seems to not reliably work because theflink-shaded-hadoop2-uber.jar
appears to have precedence in the classpath.