Does Spark support the WITH clause like SQL?

21.5k views Asked by At

I have table employee_1 in Spark with attributes id and name(with data), and another table, employee_2, with the same attributes. I want to load the data by increasing the id values with +1.

My WITH clause is shown below:

WITH EXP AS (SELECT  ALIASNAME.ID+1 ID, ALIASNAME.NAME NAME FROM employee_1 ALIASNAME)
INSERT INTO TABLE employee_2 SELECT * FROM EXP;

Steps of execution:

I have a file (with data) in an HDFS location.

  1. Creating an RDD based on the HDFS location.
  2. RDD to a Hive temporary table
  3. From the temporary table to the Hive Target (employee_2).

When I am running with a test program from the backend, it’s succeeding. But the data is not loading. employee_2 is empty.

Note:

If you run the above WITH clause in Hive, it will succeed and the data will load. But in Spark it won't in 1.6. Why?

2

There are 2 answers

0
Garren S On BEST ANSWER

The WITH statement is not the problem, but rather the INSERT INTO statement that's causing trouble.

Here's a working example that uses the .insertInto() style instead of the "INSERT INTO" SQL:

val s = Seq((1,"foo"), (2, "bar"))
s: Seq[(Int, String)] = List((1,foo), (2,bar))
val df = s.toDF("id", "name")
df.registerTempTable("df")
sql("CREATE TABLE edf_final (id int, name string)")
val e = sql("WITH edf AS (SELECT id+1, name FROM df cook) SELECT * FROM edf")
e.insertInto("edf_final")

Another option is to use the df.write.mode("append").saveAsTable("edf_final") style.

Relevant SO: "INSERT INTO ..." with SparkSQL HiveContext

0
Ram Ghadiyaram On

Since the answer is for Spark 2.x, I am rewriting it in the Spark 3 way.

%scala

import org.apache.spark.sql.functions.col

val s = Seq((1,"foo"), (2, "bar"))
val df = s.toDF("id", "name")
df.createOrReplaceTempView("df")
spark.sql("CREATE TABLE if not exists edf_final (id int, name string)")
val e = spark.sql("WITH edf AS (SELECT id+1 AS id, name FROM df) SELECT * FROM edf")
e.select(col("id"), col("name")).write.insertInto("edf_final")

spark.sql("select * from edf_final").show

Result:

df:org.apache.spark.sql.DataFrame
id:integer
name:string
e:org.apache.spark.sql.DataFrame
id:integer
name:string
+---+----+
| id|name|
+---+----+
|  2| foo|
|  3| bar|
+---+----+