I've been strugling with distcp for several days and I swear I have googled enough. Here is my use-case:
USE CASE
I have a main folder in a certain location say /hdfs/root, with a lot of subdirs (deepness is not fixed) and files.
Volume: 200,000 files ~= 30 GO
I need to copy only a subset for a client, /hdfs/root in another location, say /hdfs/dest This subset is defined by a list of absolute path that can be updated over time.
Volume: 50,000 files ~= 5 GO
You understand that I can't use a simple hdfs dfs -cp /hdfs/root /hdfs dest
because it is not optimized, it will take every files, and it hasn't an -update mode.
SOLUTION POC
I ended up using hadoop distcp in two ways:
Algo 1 (simplified):
# I start up to N distcp jobs in parallel for each subdir, with N=MAX_PROC (~30)
foreach subdir in mylist:
# mylist = /hdfs/root/dirX/file1 /hdfs/root/dirX/file2 ...
mylist = buildList(subdirs)
hadoop distcp -i -pct -update mylist /hdfs/dest/subdir &
and
Algo 2
# I start one distcp that has a blacklist
blacklist = buildBlackList()
hadoop distcp -numListstatusThread 10 -filters blacklist -pct -update /hdfs/root /hdfs/dest
Algo 2 does not even start, it seems that building a diff between source and blacklist is too hard for him, so I use Algo 1, and it works.
OOZIE WORKFLOW
Know I need to schedule all the workflow in a Oozie workflow. I have put the algo 2 in a shell action, since I have a lot of distcp command and I don't master recursion or loop in oozie.
Once started, after a while, I get the following error: Container runs beyond physical memory limits. Current usage: 17.2 GB of 16 GB physical memory used
Alright then, i'm gonna add more memory :
<configuration>
<property>
<name>oozie.launcher.mapreduce.map.memory.mb</name>
<value>32768</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.java.opts</name>
<value>-Xmx512m</value>
</property>
</configuration>
And still I get: Container runs beyond physical memory limits. Current usage: 32.8 GB of 32 GB physical memory used But the job lived twice as long as the previous one.
The RAM on my cluster is not infinite, so I can't go further. Here are my hypothesis:
- A distcp job does not release memory (JVM garbage collector ?)
- Oozie sees the addition of all distcp jobs as the current memory usage, which is stupid
- This is not the right way to do this (well I know, but still)
Also, there are a lot of things I did not understand about memory management, it's pretty foggy (yarn, oozie, jvm, mapreduce).
While googling, I noticed few people are talking about real distcp use case, this post is 4 days old: https://community.hortonworks.com/articles/71775/managing-hadoop-dr-with-distcp-and-snapshots.html and explains the snapshot usage, that I can't use in my case.
I've also heard about http://atlas.incubator.apache.org that would eventually solve my problem by "tagging" files and grant access to specific users, so we can avoid copying to a certain location. My admin team is working on it, but we won't get it to production know.
I'm quite desperate. Help me.
YARN containers are built on top of Linux "cgroups". These "cgroups" are used to put soft limits on CPU, but not on RAM...
Therefore YARN uses a clumsy workaround: it periodically checks how much RAM each container uses, and kills brutally anything that got over quota. So you lose the execution logs, and only get that dreadful message you have seen.
In most cases, you are running some kind of JVM binary (i.e. a Java/Scala utility or custom program) so you can get away by setting your own JVM quotas (especially
-Xmx
) so that you always stay under the YARN limit. Which means some wasted RAM because of the safety margin. But then the worse case is an clean failure of the JVM when it's out of memory, you get the execution logs in extenso and can start adjusting the quotas -- or fixing your memory leaks:-/
So what happens in your specific case? You are using Oozie to start a shell -- then the shell starts a
hadoop
command, which runs in a JVM. It is on that embedded JVM that you must set the Max Heap Size.Long story short: if you allocate 32GB to the YARN container that runs your shell (via
oozie.launcher.mapreduce.map.memory.mb
) then you must ensure that the Java commands inside the shell do not consume more than, say, 28GB of Heap (to stay on the safe side).If you are lucky, setting a single env variable will do the trick:
If you are not lucky, you will have to unwrap the whole mess of
hadoop-env.sh
mixing different env variables with different settings (set by people that visibly hate you, in init scripts that you cannot even know about) to be interpreted by the JVM using complex precedence rules. Have fun. You may peek at that very old post for hints about where to dig.