registerTempTable() doesn't register all records

1.1k views Asked by At

I tried to create a function which would get the data from relational database and insert them into Hive table. Since I use Spark 1.6, I need to register a temporary table, because writing dataframe directly as Hive table is not compatible with Hive:

spark_conf = SparkConf()
sc = SparkContext(conf=spark_conf)
sqlContext = HiveContext(sc)

query = "(select * from employees where emp_no < 10008) as emp_alias"
df = sqlContext.read.format("jdbc").option("url", url) \
        .option("dbtable", query) \
        .option("user", user) \
        .option("password", pswd).load()

df.registerTempTable('tempEmp')

sqlContext.sql('insert into table employment_db.hive_employees select * from tempEmp')

The employees table in RDB contains few thousand records. After running my program I can see that two parquet files are created:

  • a file, which is created after my code finishes
  • a file, which is created after two hours

So when I try to select from the Hive table after the job is completed, there are missing records.

I have multiple ideas, which could cause the problem:

  1. Could it be caused by lazy evaluation of registerTempTable? Does Spark think that I don't use those records? I am familiar with lazy evaluation in generators, but I can't imagine how exactly lazy evaluation works in registerTempTable function.
  2. Does it save the temporary tables in tmp folder? Can it be caused because of not enough space? Should I use the dropTempTable function?
  3. Is safer to use createOrReplaceTempView (despite the fact that registerTempTable is deprecated in Spark 2).

More info

  • using Spark 1.6 on Yarn (Hadoop 2.6.0-cdh5.8.0)
  • running multiple jobs with different Hive Context, but I don't access the temporary tables across the context
1

There are 1 answers

0
Vinayak Dornala On

Can you check with df.saveAsTable("db.tempEmp")

1 Create a new file employee.txt and below content.

[root@quickstart spark]# vi employee.txt 

Name, Age
Vinayak, 35
Nilesh, 37
Raju, 30
Karthik, 28
Shreshta,1
Siddhish, 2

2 Execute below commands on spark-shell

val employee = sc.textFile("file:///home/cloudera/workspace/spark/employee.txt")
 val employeefirst = employee.first
 val employeeMap = employee.
 filter(e=>e!=employeefirst).
 map(e=>{
   val splitted = e.split(",")
   val name = splitted(0).trim
   val age = scala.util.Try(splitted(1).trim.toInt) getOrElse(0)
   (name, age)
 })

 val employeeDF = employeeMap.toDF("Name", "age")

 employeeDF.show()

scala>  employeeDF.show()
+--------+---+
|    Name|age|
+--------+---+
| Vinayak| 35|
|  Nilesh| 37|
|    Raju| 30|
| Karthik| 28|
|Shreshta|  1|
|Siddhish|  2|
+--------+---+

3 Create a new database.

hive> create database employeetest (optional);
OK
Time taken: 0.325 seconds
hive> use employeetest;
OK
Time taken: 0.153 seconds

4 Create table in employeetest database.

scala> employeeDF.saveAsTable("employeetest.Employee")

hive> show tables;
OK
employee
Time taken: 0.171 seconds, Fetched: 1 row(s)
hive> select * from employee;
OK
Vinayak 35
Nilesh  37
Raju    30
Karthik 28
Shreshta    1
Siddhish    2
Time taken: 0.462 seconds, Fetched: 6 row(s)

or else you can use below method to create table in hive from spark-shell

scala> employeeDF.registerTempTable("employeetohive")

scala> employeeDF.sqlContext.sql("select * from employeetohive").show
+--------+---+
|    Name|age|
+--------+---+
| Vinayak| 35|
|  Nilesh| 37|
|    Raju| 30|
| Karthik| 28|
|Shreshta|  1|
|Siddhish|  2|
+--------+---+


scala> employeeDF.sqlContext.sql("create table employeetest.employeefromdf select * from employeetohive").show

hive> show tables;
OK
employee
employeefromdf
Time taken: 0.101 seconds, Fetched: 2 row(s)
hive> select * from employeefromdf;
OK
Vinayak 35
Nilesh  37
Raju    30
Karthik 28
Shreshta    1
Siddhish    2
Time taken: 0.246 seconds, Fetched: 6 row(s)