I tried to create a function which would get the data from relational database and insert them into Hive table. Since I use Spark 1.6, I need to register a temporary table, because writing dataframe directly as Hive table is not compatible with Hive:
spark_conf = SparkConf()
sc = SparkContext(conf=spark_conf)
sqlContext = HiveContext(sc)
query = "(select * from employees where emp_no < 10008) as emp_alias"
df = sqlContext.read.format("jdbc").option("url", url) \
.option("dbtable", query) \
.option("user", user) \
.option("password", pswd).load()
df.registerTempTable('tempEmp')
sqlContext.sql('insert into table employment_db.hive_employees select * from tempEmp')
The employees table in RDB contains few thousand records. After running my program I can see that two parquet files are created:
- a file, which is created after my code finishes
- a file, which is created after two hours
So when I try to select from the Hive table after the job is completed, there are missing records.
I have multiple ideas, which could cause the problem:
- Could it be caused by lazy evaluation of
registerTempTable? Does Spark think that I don't use those records? I am familiar with lazy evaluation in generators, but I can't imagine how exactly lazy evaluation works inregisterTempTablefunction. - Does it save the temporary tables in
tmpfolder? Can it be caused because of not enough space? Should I use thedropTempTablefunction? - Is safer to use
createOrReplaceTempView(despite the fact thatregisterTempTableis deprecated in Spark 2).
More info
- using Spark 1.6 on Yarn (Hadoop 2.6.0-cdh5.8.0)
- running multiple jobs with different Hive Context, but I don't access the temporary tables across the context
Can you check with df.saveAsTable("db.tempEmp")
1 Create a new file employee.txt and below content.
2 Execute below commands on spark-shell
3 Create a new database.
4 Create table in employeetest database.
or else you can use below method to create table in hive from spark-shell