how to use a whole hive database in spark and read sql queries from external files?

5.8k views Asked by At

I am using hortonworks sandbox in Azure with spark 1.6. I have a Hive database populated with TPC-DS sample data. I want to read some SQL queries from external files and run them on the hive dataset in spark. I follow this topic Using hive database in spark which is just using a table in my dataset and also it writes SQL query in spark again, but I need to define whole, dataset as my source to query on that, I think i should use dataframes but i am not sure and do not know how! also I want to import the SQL query from external .sql file and do not write down the query again! would you please guide me how can I do this? thank you very much, bests!

1

There are 1 answers

7
Sandeep Singh On BEST ANSWER

Spark Can read data directly from Hive table. You can create, drop Hive table using Spark and even you can do all Hive hql related operations through the Spark. For this you need to use Spark HiveContext

From the Spark documentation:

Spark HiveContext, provides a superset of the functionality provided by the basic SQLContext. Additional features include the ability to write queries using the more complete HiveQL parser, access to Hive UDFs, and the ability to read data from Hive tables. To use a HiveContext, you do not need to have an existing Hive setup.

For more information you can visit Spark Documentation

To Avoid writing sql in code, you can use property file where you can put all your Hive query and then you can use the key in you code.

Please see below the implementation of Spark HiveContext and use of property file in Spark Scala.

package com.spark.hive.poc

import org.apache.spark._
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql._
import org.apache.spark._
import org.apache.spark.sql.DataFrame;
import org.apache.spark.rdd.RDD;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.hive.HiveContext;

//Import Row.
import org.apache.spark.sql.Row;
//Import Spark SQL data types
import org.apache.spark.sql.types.{ StructType, StructField, StringType };

object ReadPropertyFiles extends Serializable {

  val conf = new SparkConf().setAppName("read local file");

  conf.set("spark.executor.memory", "100M");
  conf.setMaster("local");

  val sc = new SparkContext(conf)
  val sqlContext = new HiveContext(sc)

  def main(args: Array[String]): Unit = {

    var hadoopConf = new org.apache.hadoop.conf.Configuration();
    var fileSystem = FileSystem.get(hadoopConf);
    var Path = new Path(args(0));
    val inputStream = fileSystem.open(Path);
    var Properties = new java.util.Properties;
    Properties.load(inputStream);

    //Create an RDD
    val people = sc.textFile("/user/User1/spark_hive_poc/input/");
    //The schema is encoded in a string
    val schemaString = "name address";

    //Generate the schema based on the string of schema
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)));

    //Convert records of the RDD (people) to Rows.
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim));
    //Apply the schema to the RDD.
    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
    peopleDataFrame.printSchema();

    peopleDataFrame.registerTempTable("tbl_temp")

    val data = sqlContext.sql(Properties.getProperty("temp_table"));

    //Drop Hive table
    sqlContext.sql(Properties.getProperty("drop_hive_table"));
    //Create Hive table
    sqlContext.sql(Properties.getProperty("create_hive_tavle"));
    //Insert data into Hive table
    sqlContext.sql(Properties.getProperty("insert_into_hive_table"));
    //Select Data into Hive table
    sqlContext.sql(Properties.getProperty("select_from_hive")).show();

    sc.stop

  }
}

Entry in Properties File :

temp_table=select * from tbl_temp
drop_hive_table=DROP TABLE IF EXISTS default.test_hive_tbl
create_hive_tavle=CREATE TABLE IF NOT EXISTS default.test_hive_tbl(name string, city string) STORED AS ORC
insert_into_hive_table=insert overwrite table default.test_hive_tbl select * from tbl_temp
select_from_hive=select * from default.test_hive_tbl

Spark submit Command to run this job:

[User1@hadoopdev ~]$ spark-submit --num-executors 1 \
--executor-memory 100M --total-executor-cores 2 --master local \
--class com.spark.hive.poc.ReadPropertyFiles Hive-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
/user/User1/spark_hive_poc/properties/sql.properties

Note: Property File location should be HDFS location.