I am running a hadoop code, and having problems.
Notice the the commented lines "debug exception 1" and "debug exception 2" and the line below each of them. Since I can't print messages with System.out.println inside a hadoop map/reduce, the only debugging method I have come to is by throwing an exception with a message as you can see below the mentioned comments.
public class FindCoallocations {
public static String bucketName;
public static AWS aws;
public static boolean debugMode = true;
public static String uniqueWord = "43uireoaugibghui4reagf"; //This word doesn't exist in the corpus (hopefully).
public static String decade;
public static long N = 1;
public static class MapperClassW1 extends Mapper<LongWritable, Text, Text , Text> {
private Text word_1;
private Text word_2;
private Text years;
private Text matchCount;
public String decade;
@Override
public void setup(Context context) {
Configuration config = context.getConfiguration();
decade = config.getStrings("decade")[0];
}
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
word_1 = new Text();
word_2 = new Text();
years = new Text();
matchCount = new Text();
StringTokenizer itr = new StringTokenizer(value.toString());
word_1.set(itr.nextToken());
word_2.set(itr.nextToken());
years.set(itr.nextToken());
matchCount.set(itr.nextToken());
if(years.toString().equals(decade)){
Text countN = new Text(uniqueWord);
context.write(countN, matchCount); // used for counting N (number of words)
context.write (word_1, value); // used for claculating c(w1)
//throw new IOException("exception at mapper. key: "+key.toString()+" value: "+value.toString() +" word_1: "+word_1.toString());
}
}
}
public static class PartitionerClass extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numPartitions) {
return (key.toString().hashCode() % numPartitions);
}
}
public static class ReducerClassW1 extends Reducer<Text,Text,Text,Text> {
public String decade;
public void setup(Context context) {
Configuration config = context.getConfiguration();
decade = config.getStrings("decade")[0];
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
if((key.toString()).equals(uniqueWord)) // count N
{
long sum = 0;
for (Text value : values) {
sum += Long.parseLong(value.toString());
}
//context.write(key, new Text(Long.toString(sum)));
context.getCounter(Counter.COMBINE_INPUT_RECORDS).increment(sum);
}
else // count w1 ( value is full entry)
{
long sum = 0;
for (Text value : values) {
StringTokenizer itr = new StringTokenizer(value.toString());
itr.nextToken();
itr.nextToken();
itr.nextToken();
String tk = itr.nextToken();
// debug exception 1
//throw new IOException ("the key: "+key.toString()+" the value: "+value.toString()+ " sum: "+sum + "itr.nextToken(): "+tk);
sum += Long.parseLong(tk);
}
for (Text value : values) {
// debug exception 2
//throw new IOException ("the key: "+key.toString()+" the value: "+value.toString()+ " sum: "+sum);
context.write(value, new Text("w1:"+Long.toString(sum)));
}
}
}
}
//Receives n args: 0 - "Step1" , 1 - decade , 2 - inputFileName1, ....... n-1 inputFileName(n-2) , n outputFileName
public static void main(String[] args) throws Exception {
System.out.println("[DEBUG] STEP 1 started!");
aws = AWS.getInstance();
bucketName = aws.bucketName;
int inputs = args.length;
System.out.println("args: ");
for(int i=0; i<args.length;i++){
System.out.print("args["+i+"]"+" : "+args[i] +", ");
}
System.out.println("\n");
String[] inputFileKey = new String[args.length-3];
String outputFileKey = args[args.length-1];
for(int i=2; i<args.length - 1;i++){
inputFileKey[i-2] = args[i];
}
FindCoallocations.setDecade(args[1]);
System.out.println("decade: "+FindCoallocations.getDecade());
for(int i=0; i<inputFileKey.length;i++){
System.out.println("input: "+inputFileKey[i]);
}
System.out.println("output: "+outputFileKey);
Configuration conf = new Configuration();
conf.setQuietMode(false);
conf.setStrings("decade",FindCoallocations.getDecade());
Job job = Job.getInstance(conf, "Join w1 count");
job.setJarByClass(FindCoallocations.class);
job.setMapperClass(MapperClassW1.class);
job.setPartitionerClass(PartitionerClass.class);
job.setCombinerClass(ReducerClassW1.class);
job.setReducerClass(ReducerClassW1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
for(int i=0; i<inputFileKey.length; i++){
FileInputFormat.addInputPath(job, new Path("s3://"+bucketName+"/"+inputFileKey[i]));
}
FileOutputFormat.setOutputPath(job, new Path("s3://"+bucketName+"/"+outputFileKey+"_"+decade+"_w1"));
job.waitForCompletion(true);
job.monitorAndPrintJob();
}
}
First question, is there a better way to debug? Currently for each run trial I need to export a new Jar, upload it to S3 (AWS), and run the hadoop, wait until it is done and only then see the output (the exception message). The whole process takes about 10 minutes. It takes forever to debug this way...
Second question, right now my problem is that with the given code, my output file turns out empty. When I uncomment the line below "debug exception 1", I get the following message:
Error: java.io.IOException: the key: nt the value: nt should_VERB 1830's 1 sum: 0itr.nextToken(): 1
So far so good. When I comment it back, and uncomment "debug exception 2" instead, I don't get any exception at all, indicating the next for-loop is not reached / doesn't iterate.
When I comment BOTH debug exceptions (not trying to throw any exception at all), the program terminates successfuly, and the output file turns out empty. (Meanning there is no exception happening in the first for-loop).
I am trying to figure out what's wrong.
There are two issues in your code as follows:
(1) In the reduce handler class,
valuesis ofIterabletype, which can only be iterated once. If you want to iterate multiple times, you can add the iteration results to a list and then iterate over the list, as shown below:(2) Improper use of
job.setCombinerClass, when settingjob.setCombinerClass(ReducerClassW1.class), the output result ofCombinerClasshas a value of string type. Errors occur when executingLong.parseLong(value.toString())in Reduce, so the code shown below can be removed:Regarding your Issue 2: After fixing the issues in the code, there will be no error messages. Finally, after executing the code on my machine, the results were successfully output in the set output directory, and there was corresponding output data:
If you need the source code files that run locally, I can also provide them.