I am trying to use a file from Google Cloud Storage via FileInputFormat
as input for a MapReduce
job. The file is in Avro
format.
As a simple test, I deployed a small Hadoop2 cluster with the bdutil
tool, consisting of the master and two worker nodes with two slots each.
When running the job, the file is splitted into multiple parts. A fact which can be verified by looking into the logs where an offset is used to load data. As a result, multiple map tasks are created. So far nothing unusual.
But those map tasks do not get distributed among the worker nodes. Instead, two are started on just one node and the other ones are left in the Scheduled
state.
I expected two map tasks on each worker to run, since the data is not locally available in any worker node (it's in the Cloud Storage), which makes them all equal candidates.
Why does this happen?
It appears you're seeing one of the artifacts of how YARN works; unlike Hadoop 1 where the JobTracker was effectively playing the role of Hadoop 2's AppMaster and ResourceManager at the same time, in Hadoop 2 the ResourceManager (running on your master node) actually packs a brand-new AppMaster onto a YARN container on-demand for each MapReduce job.
Also, another concept that changes a bit is that you never quite have "slots", YARN containers actually schedule across both the memory and CPU dimensions. This means if a single task packed onto YARN requests lots of memory but only 1 CPU, it may occupy a resource footprint that otherwise might have packed several map or reduce tasks.
For example, assuming you deployed 2 workers each n1-standard-2, you may see something like this on the ResourceManager page under
http://<master-ip>:8088/cluster/nodes
when running your mapreduce:In this case, visiting the
application_master
link from the ResourceManager showed that the ResourceManager was indeed packed onto the VM that reported5.5GB Mem Used, 0B Mem Avail, 1 VCores Used
. Similarly, I found that my map tasks were running only on the worker that reported2 VCores Used
.In general, this means if you're mostly interested in making sure it'll scale as you increase the number of workers, you don't have to do anything special; you'll just end up with your map or reduce tasks packed onto
NUM_WORKERS - 1
possible machines while one of them runs the AppMaster for the job.Depending on your job though, this might be wasteful. The default settings are best for extremely large jobs where it makes sense to have a very large AppMaster to make sure it doesn't OOM tracking a large number of in-flight tasks. You can adjust the fine-grained settings by overriding
NODEMANAGER_MEMORY_FRACTION
,CORES_PER_MAP_TASK
,CORES_PER_REDUCE_TASK
, andCORES_PER_APP_MASTER
in your custom*_env.sh
file (or inline inhadoop2_env.sh
, but this is harder to keep track of for upgrades vs maintaining a file likemy_overrides_env.sh
). The comments in bdutil'shadoop2_env.sh
explain these settings:Especially if you move up to larger machines like n1-standard-4, you may consider simply modifying
NODEMANAGER_MEMORY_FRACTION
to a smaller value.