Does Spark respect kudu's hash partitioning similar to bucketed joins on parquet tables?

400 views Asked by At

I'm trying out Kudu with Spark. I want to join 2 tables with the following schema-

# This table has around 1 million records
TABLE dimensions (
    id INT32 NOT NULL,
    PRIMARY KEY (id)
)
HASH (id) PARTITIONS 32,
RANGE (id) (
    PARTITION UNBOUNDED
)
OWNER root
REPLICAS 1

# This table has 500 million records
TABLE facts (
    id INT32 NOT NULL,
    date DATE NOT NULL,
    PRIMARY KEY (id, date)
)
HASH (id) PARTITIONS 32,
RANGE (id, date) (
    PARTITION UNBOUNDED
)
OWNER root
REPLICAS 1

I inserted data to these tables using the following script-

// Load data to spark dataframe
val dimensions_raw = spark.sqlContext.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/root/dimensions.csv")

dimensions_raw.printSchema
dimensions_raw.createOrReplaceTempView("dimensions_raw")

// Set the primary key columns
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
def setNotNull(df: DataFrame, columns: Seq[String]) : DataFrame = {
  val schema = df.schema
  // Modify [[StructField] for the specified columns.
  val newSchema = StructType(schema.map {
    case StructField(c, t, _, m) if columns.contains(c) => StructField(c, t, nullable = false, m)
    case y: StructField => y
  })
  // Apply new schema to the DataFrame
  df.sqlContext.createDataFrame(df.rdd, newSchema)
}
val primaryKeyCols = Seq("id") // `primaryKeyCols` for `facts` table is `(id, date)`
val dimensions_prep = setNotNull(dimensions_raw, primaryKeyCols)
dimensions_prep.printSchema
dimensions_prep.createOrReplaceTempView("dimensions_prep")


// Create a kudu table
import collection.JavaConverters._
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu._
val kuduContext = new KuduContext("localhost:7051", spark.sparkContext)

// Delete the table if it already exists.
if(kuduContext.tableExists("dimensions")) {
    kuduContext.deleteTable("dimensions")
}

kuduContext.createTable("dimensions", dimensions_prep.schema,
  /* primary key */ primaryKeyCols,
  new CreateTableOptions()
    .setNumReplicas(1)
    .addHashPartitions(List("id").asJava, 32))

// Load the kudu table from spark dataframe
kuduContext.insertRows(dimensions_prep, "dimensions")
// Create a DataFrame that points to the Kudu table we want to query.
val dimensions = spark.read
    .option("kudu.master", "localhost:7051")
    .option("kudu.table", "dimensions")
    .format("kudu").load
dimensions.createOrReplaceTempView("dimensions")

Ran the above script for facts table as well.

I want to join facts with dimensions table on id. I tried the following command in Spark-

val query = facts.join(dimensions, facts.col("id") === dimensions.col("id"))
query.show()

// And I get the following Physical plan-
== Physical Plan ==
*(5) SortMergeJoin [id#0], [id#14], Inner
:- *(2) Sort [id#0 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#0, 200), true, [id=#43]
:     +- *(1) Scan Kudu facts [id#0,date#1] PushedFilters: [], ReadSchema: struct<id:int,date:date...
+- *(4) Sort [id#14 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#14, 200), true, [id=#49]
      +- *(3) Scan Kudu dimensions [id#14] PushedFilters: [], ReadSchema: struct<id:int>

My question is that how do I tell spark that the tables are already sorted on id (join key) so no need to sort again. Moreover the Exchange hashpartitioning need not be done as the table is already bucketed over id.

The join query is taking sub 100seconds on a single machine with single master & tablet server running. Am I doing something wrong here or is it the expected speed with Kudu for this kind of query?

0

There are 0 answers