How to import table data stored in Hive in my MapReduce job?

1.7k views Asked by At

I am using a single node cluster setup of Apache Hadoop 2.5.0 on Ubuntu 14.04 I stored tweets in my HDFS using Flume. Then, I used the following Hive commands to create a table in Hive which stores all the tweets in tabular format:

CREATE EXTERNAL TABLE tweets (
   id BIGINT,
   created_at STRING,
   source STRING,
   favorited BOOLEAN,
   retweet_count INT,
   retweeted_status STRUCT<
      text:STRING,
      user:STRUCT<screen_name:STRING,name:STRING>>,
   entities STRUCT<
      urls:ARRAY<STRUCT<expanded_url:STRING>>,
      user_mentions:ARRAY<STRUCT<screen_name:STRING,name:STRING>>,
      hashtags:ARRAY<STRUCT<text:STRING>>>,
   text STRING,
   user STRUCT<
      screen_name:STRING,
      name:STRING,
      friends_count:INT,
      followers_count:INT,
      statuses_count:INT,
      verified:BOOLEAN,
      utc_offset:INT,
      time_zone:STRING>,
   in_reply_to_screen_name STRING
)
ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
LOCATION '/user/flume/tweets';

I have verified that the data exists in the table 'tweets' by querying the database using HiveQL (from the Hive Command Line Interface). I also created an output table using the following command:

CREATE TABLE outputtable (
    a STRING,
    b INT );

I am using Apache Hive 0.13.1 which already has HCatalog in it. After all this, I am trying to write a MapReduce Job using Java language in Eclipse. I have added the following libraries to my project as external jars:

  1. All the libraries present in path-of-installation-of-hadoop/share/hadoop/common
  2. All the libraries present in path-of-installation-of-hadoop/share/hadoop/mapreduce
  3. All the libraries present in the lib folder of Hive
  4. All the libraries present in path-of-installation-of-Hive/hcatalog/share/hcatalog

My MapReduce code is trying to import the text of the tweets from the table 'tweets' and then process it. My MapReduce code is:

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hcatalog.common.*;
import org.apache.hcatalog.mapreduce.*;
import org.apache.hcatalog.data.*;
import org.apache.hcatalog.data.schema.*;

public class UseHCat extends Configured implements Tool {

    public static class Map extends Mapper<WritableComparable, HCatRecord, Text, IntWritable> {
    String tweetText;

    @Override
      protected void map( WritableComparable key,
                      HCatRecord value,
                      org.apache.hadoop.mapreduce.Mapper<WritableComparable, HCatRecord,
                      Text, IntWritable>.Context context)
        throws IOException, InterruptedException {
        tweetText = (String) value.get(7);
        int i = 1;
        context.write(new Text(tweetText), new IntWritable(i));
    }
    }

    public static class Reduce extends Reducer<Text, IntWritable,
                                   WritableComparable, HCatRecord> {

    protected void reduce( Text key,
                           java.lang.Iterable<IntWritable> values,
                           org.apache.hadoop.mapreduce.Reducer<Text, IntWritable,
                           WritableComparable, HCatRecord>.Context context)
        throws IOException, InterruptedException {
        Iterator<IntWritable> iter = values.iterator();
        IntWritable iw = iter.next();
        int id = iw.get();
        HCatRecord record = new DefaultHCatRecord(2);
        record.set(0, key.toString());
        record.set(1, id);
        context.write(null, record);
    }
    }

    public int run(String[] args) throws Exception {
    Configuration conf = getConf();

    String inputTableName = "tweets";
    String outputTableName = "outputtable";
    String dbName = null;

    Job job = new Job(conf, "UseHCat");
    HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));
    job.setJarByClass(UseHCat.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    // An HCatalog record as input
    job.setInputFormatClass(HCatInputFormat.class);

    // Mapper emits a string as key and an integer as value
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    // Ignore the key for the reducer output; emitting an HCatalog record as value
    job.setOutputKeyClass(WritableComparable.class);
    job.setOutputValueClass(DefaultHCatRecord.class);
    job.setOutputFormatClass(HCatOutputFormat.class);

    HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
    HCatSchema s = HCatOutputFormat.getTableSchema(job);
    System.err.println("INFO: output schema explicitly set for writing:" + s);
    HCatOutputFormat.setSchema(job, s);
    return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new UseHCat(), args);
    System.exit(exitCode);
    }
}

The first problem that we are facing is that we are getting many warnings specifying that some of the types and constructors are deprecated. We ignored the warnings and created a jar file of our Project whose main class is 'UseHCat'. Then we browsed to the location where the jar file was created using the terminal provided in Ubuntu and ran the following command:

hadoop jar MyProject.jar

We got the following error:

14/11/16 17:17:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hive/ql/metadata/HiveStorageHandler
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at org.apache.hcatalog.mapreduce.InitializeInput.getInputJobInfo(InitializeInput.java:146)
    at org.apache.hcatalog.mapreduce.InitializeInput.setInput(InitializeInput.java:86)
    at org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:86)
    at org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:55)
    at org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:47)
    at UseHCat.run(UseHCat.java:64)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
    at UseHCat.main(UseHCat.java:91)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.metadata.HiveStorageHandler
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 26 more
1

There are 1 answers

2
Muthu On

Hive is developed to minimize the writing mapreduce program.You can perform the process using Hive queries, internally it will convert into mapreduce job.

However, if you want to access the Hivedb data,you can access. Hive is not a database. All the data stored under warehouse dir in readable format. You can give full path as a input to your mapreduce program.

Have you tried a sample mapreduce program in eclipse. Because you have build the Hadoop plugin or you can use the existing plugin in your eclipse to run mapreduce.