i am trying to run my map-reduce job on EMR (Amazon) after i checked it on my local computer and i'm getting this error:
Error: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class Main$MapClass not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1720)
at org.apache.hadoop.mapreduce.task.JobContextImpl.getMapperClass(JobContextImpl.java:186)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:733)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
Caused by: java.lang.ClassNotFoundException: Class Main$MapClass not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1626)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1718)
... 8 more
this is the main function that define the Job configuration:
public static void main(String[] args) throws Exception {
String inputLocation;
String outputLocation;
String includeStopWords;
if (args.length > 2) {
inputLocation = args[0];
outputLocation = args[1];
includeStopWords = args[2];
}else{
for(int i=0; i < args.length; i++){
System.out.println("Missing Args!!\n" + "Number of args: "+ args.length+ "\n args[" + i+ "]:" + args[i]);
}
throw new IllegalArgumentException();
}
// first job - count the 2 grams words by decade
Configuration conf = new Configuration();
conf.set("includeStopWords", includeStopWords);
@SuppressWarnings("deprecation")
Job job = new Job(conf, "words count");
System.out.println("before set classes:");
job.setJarByClass(Main.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
System.out.println("after setting classes.");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// job.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.addInputPath(job, new Path(inputLocation));
FileOutputFormat.setOutputPath(job, new Path(outputLocation));
System.out.println("before wait to complition");
System.exit(job.waitForCompletion(true) ? 0 : 1);
System.out.println("after wait to completion");
}
the code of the runner on EMR is:
public class Runner {
public static Logger logger = LogManager.getRootLogger();
public static void main(String[] args) throws IOException {
String minPmi;
String relMinPmi;
String language;
String includeStopWords;
if(args.length > 3){
minPmi = args[0];
relMinPmi = args[1];
language = args[2];
includeStopWords = args[3];
}else{
System.out.println("Missing Arguments!");
throw new IllegalArgumentException();
}
//Jobs output locations
String firstOutput = "s3n://dsp152ass2/outputs/first";
String secondOutput = "s3n://dsp152ass2/outputs/second";
String thirdOutput = "s3n://dsp152ass2/outputs/third";
//Jobs jar location
String firstJobJar = "s3n://dsp152ass2/jars/firstJob.jar";
String secondJobJar = "s3n://dsp152ass2/jars/secondJob.jar";
String thirdJobJar = "s3n://dsp152ass2/jars/thirdJob.jar";
//select input corpus by language argument
String corpus = "s3n://dsp152/output/eng-us-all-100k-2gram"; //TODO: change to the real input
if(language.equalsIgnoreCase("heb")){
corpus = "s3n://dsp152/output/heb-all-100k-2gram";
}
//Create EMR
AWSCredentials credentials = new PropertiesCredentials(new FileInputStream(new File("credentials.properties")));
AmazonElasticMapReduce mapReduce = new AmazonElasticMapReduceClient(credentials);
//Define hadoop Steps config
HadoopJarStepConfig firstJobConfing = new HadoopJarStepConfig()
.withJar(firstJobJar)
//.withMainClass("FirstMR.Main") // sec only runner
.withArgs(corpus, firstOutput , includeStopWords);
HadoopJarStepConfig secondJobConfing = new HadoopJarStepConfig()
.withJar(secondJobJar)
// .withMainClass("Main")
.withArgs(firstOutput +"/part-r-00000" , secondOutput);
HadoopJarStepConfig thirdJobConfing = new HadoopJarStepConfig()
.withJar(thirdJobJar)
//.withMainClass("Main")
.withArgs(secondOutput+"/part-r-00000", thirdOutput , minPmi, relMinPmi);
//Define step config
StepConfig firstJobStep = new StepConfig()
.withName("firstJobStep")
.withHadoopJarStep(firstJobConfing)
.withActionOnFailure("TERMINATE_JOB_FLOW");
StepConfig secondJobStep = new StepConfig()
.withName("secondJobStep")
.withHadoopJarStep(secondJobConfing)
.withActionOnFailure("TERMINATE_JOB_FLOW");
StepConfig thirdJobStep = new StepConfig()
.withName("thirdJobStep")
.withHadoopJarStep(thirdJobConfing)
.withActionOnFailure("TERMINATE_JOB_FLOW");
//Define job flow
JobFlowInstancesConfig instances = new JobFlowInstancesConfig()
.withInstanceCount(1) //TODO: change to 2 - 10
.withMasterInstanceType(InstanceType.M1Large.toString())
.withSlaveInstanceType(InstanceType.M1Large.toString())
.withHadoopVersion("2.2.0").withEc2KeyName("dsp152ass2")
.withKeepJobFlowAliveWhenNoSteps(false)
.withPlacement(new PlacementType("us-east-1b"));
//Define run flow
RunJobFlowRequest runFlowRequest = new RunJobFlowRequest()
.withName("DSPextractCollections")
.withInstances(instances)
.withJobFlowRole("EMR_EC2_DefaultRole")
.withServiceRole("EMR_DefaultRole")
.withSteps(firstJobStep,secondJobStep,thirdJobStep)
.withLogUri("s3n://dsp152ass2/logs/");
//Run the jobs
RunJobFlowResult runJobFlowResult = mapReduce.runJobFlow(runFlowRequest);
String jobFlowId = runJobFlowResult.getJobFlowId();
System.out.println("### WORKFLOW SUCCESSFULLY ADDED: \n" + "\t" + jobFlowId);
}
}
my project structure:
thanks in advance for your help.
These are steps that can solve the resolution problem:
Mapper
andReducer
classes.foo.Main
,foo.MapClass
...).Tip: EMR has specific settings that are not like running a local (and probably pseudo-distributed) deployment. Make sure those are correct by following AWS's guide.