How can I run Java MapReduce application on Apache TEZ engine?

46 views Asked by At

I'm using the following code to run the Java MapReduce application in an AWS EMR. Now, I need to run this in the TEZ engine, not the MapReduce engine. When I changed the mapreduce.framework.name property to yarn-tez and tested it against a smaller data set it worked. But when I run the application against a much higher data set it executes the map jobs 100%, but reduce jobs are stuck at 0% when I check the application status using yarn application -status command, it's showing as the progress 0%. Will you be able to help me to understand the issue here?

public class MyFirstMapReduceMain {

    public static void main(String[] args) {
        try {

            String inputSource = args[0];
            String outputResult =  args[1];
            String hostName = args.length > 2 ? args[2] : "localhost";
            String port = args.length > 3 ? args[3] : "10000";
            String database = args.length > 4 ? args[4] : "hive_test_db";
            String username = args.length > 5 ? args[5] : "";
            String password = args.length > 6 ? args[6] : "";
            String appType = args.length > 7 ? args[7] : "";

            Configuration conf = new Configuration();
            conf.set("reducer-hive-host", hostName);
            conf.set("reducer-hive-port", port);
            conf.set("reducer-hive-db", database);
            conf.set("reducer-hive-un", username);
            conf.set("reducer-hive-pw", password);
            conf.set("mapreduce.framework.name", "yarn-tez");

            conf.set("mapreduce.input.fileinputformat.inputformat", "org.apache.hadoop.ql.io.RCFileInputFormat");
            conf.setInt("mapreduce.task.timeout", 1800000);
            conf.setInt("mapred.task.timeout", 1800000);

            Job job = Job.getInstance(conf, "MyFirst MapReduce Job");
            setTextOutputFormatSeparator(job, "|");
            job.setNumReduceTasks(Integer.valueOf(reduceCount));
            job.setJarByClass(MyFirstMapReduceMain.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);

            FileInputFormat.setInputPaths(job, new Path(inputSource));
            FileOutputFormat.setOutputPath(job, new Path(outputResult));

            if (job.waitForCompletion(true)) {
                HiveHelper.loadOutputToHiveTable(new Path(outputResult), hostName, port, database, username, password, "output_hive_table");
            } else {
                System.exit(1);
            }

        } catch (Exception e) {
            log.error("Exception occurred when executing MyFirstMapReduceMain", e);
        }

    }
0

There are 0 answers