How do I use SparkSQL and its execution engine to query Hive databases and tables without invoking any part of the Hive execution engine?

210 views Asked by At

I've created select and join statements that I can run from the Hive CLI and/or the beeline CLI and/or Spark (2.3.1) WITH enableHiveSupport=TRUE. (Note: I'm using SparkR for my API)

The join and write using beeline takes 30 minutes, but the join and write using Spark with enableHiveSupport=TRUE takes 3.5 HOURS. This either means Spark and its connectors are crap, or I'm not using spark the way I should be... and everything I read about Spark's 'best thing since sliced bread' commentary means I'm probably not using it right.

I want to read from Hive tables, but I don't want Hive to do anything. I'd like to run joins over monthly data, run a regression on each record's monthly delta, then output my final slopes/betas to an output table in parquet that is readable from Hive, if necessary... preferably partitioned the same way that I have partitioned the tables I'm using as input data from Hive.

Here's some code, as requested... but I dont think you're going to learn anything. You're not going to get reproducible results with Big Data queries.

Sys.setenv(SPARK_HOME="/usr/hdp/current/spark2-client")
sessionInfo()
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.stop()
Sys.setenv(SPARKR_SUBMIT_ARGS="--master yarn sparkr-shell") #--master yarn-client sparkr-shell
Sys.setenv(LOCAL_DIRS="/tmp")
config = list()
config$spark.cores.max <- 144L
config$spark.executor.cores <- 2L
config$spark.executor.memory <- '8g'
config$spark.driver.cores <- 6L
config$spark.driver.maxResultSize <-"0"
config$spark.driver.memory <- "32g"
config$spark.shuffle.service.enabled<-TRUE
config$spark.dynamicAllocation.enabled <-FALSE
config$spark.scheduler.mode <- 'FIFO'
config$spark.ui.port<-4044L
sparkR.session(master = "yarn",
           sparkHome = Sys.getenv("SPARK_HOME"),
           sparkConfig = config,
           enableHiveSupport = TRUE)
print("Connected!")

############ SET HIVE CONFIG 
collect(sql("SET hive.exec.dynamic.partition") )
sql("SET hive.exec.dynamic.partition=true")
collect(sql("SET hive.exec.dynamic.partition.mode"))
sql("SET hive.exec.dynamic.partition.mode=nonstrict")
##
start_time <- Sys.time()
############### READ IN DATA {FROM HIVE} 
sql('use historicdata')
data_tables<-collect(sql('show tables'))
exporttabs <- grep(pattern = 'export_historic_archive_records',x = data_tables$tableName,value = TRUE)
jointabs<-sort(exporttabs)[length(exporttabs)-(nMonths-1):0]
currenttab<-jointabs[6]

############### CREATE TABLE AND INSERT SCRIPTS 
sql(paste0('use ',hivedb))
sql(paste0('DROP TABLE IF EXISTS histdata_regression',tab_suffix))
sSelect<-paste0("Insert Into TABLE histdata_regression",tab_suffix," partition (scf) SELECT a.idkey01, a.ssn7")
sCreateQuery<-paste0("CREATE TABLE histdata_regression",tab_suffix," (idkey01 string, ssn7 string")
sFrom<-paste0("FROM historicdata.",jointabs[nMonths]," a")
sAlias<-letters[nMonths:1]
DT <- gsub(pattern = "export_historic_archive_records_",replacement = "",jointabs)
DT<-paste0(DT)
for (i in nMonths:1) {
  sSelect<-paste0(sSelect,", ",sAlias[i],".",hdAttr," as ",hdAttr,"_",i,", ",sAlias[i],".recordid as recordid_",DT[i])
  sCreateQuery<-paste0(sCreateQuery,", ",hdAttr,"_",i," int, recordid_",DT[i]," int")
  if (i==1) sCreateQuery<-paste0(sCreateQuery,') PARTITIONED BY (scf string) STORED AS ORC')
  if (i==1) sSelect<-paste0(sSelect,", a.scf")
  if (i!=nMonths) sFrom<-paste0(sFrom," inner join historicdata.",jointabs[i]," ",sAlias[i]," on ",
                              paste(paste0(paste0("a.",c("scf","idkey01","ssn7")),"=",
                                     paste0(sAlias[i],".",c("scf","idkey01","ssn7"))),collapse=" AND "))
}

system(paste0('beeline -u "jdbc:hive2://myserver1.com,myserver2.com,myserver3.com,myserver4.com,myserver5.com/work;\
            serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -e "',sCreateQuery,'"'))

system(paste0("beeline -u \"jdbc:hive2://myserver1.com,myserver2.com,myserver3.com,myserver4.com,myserver5.com/work;\
            serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2\" -e \"",sSelect," ",sFrom,"\""))
0

There are 0 answers