Spark not use DirectJoin over DSE

169 views Asked by At

I'm developing a Spark streaming task that joins data from stream with a Cassandra Table. As you can see in Explain Plan Direct Join is not used. According to DSE doc Direct Join is used when (table size * directJoinSizeRatio) > size of keys. In my case Table has millions of record and keys are only one record (form streaming), so i'm expecting Diret Join is used. Table radice_polizza has only id_cod_polizza column as partition jey. Connector version:2.5.1. DSE version: 6.7.6.

*Project [id_cod_polizza#86L, progressivo#11, id3_numero_polizza#25, id3_cod_compagnia#21]
+- *SortMergeJoin [id_cod_polizza#86L], [id_cod_polizza#10L], Inner
   :- *Sort [id_cod_polizza#86L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id_cod_polizza#86L, 200)
   :     +- *Project [value#84L AS id_cod_polizza#86L]
   :        +- *SerializeFromObject [input[0, bigint, false] AS value#84L]
   :           +- Scan ExternalRDDScan[obj#83L]
   +- *Sort [id_cod_polizza#10L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id_cod_polizza#10L, 200)
         +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [id_cod_polizza#10L,progressivo#11,id3_numero_polizza#25,id3_cod_compagnia#21] ReadSchema: struct<id_cod_polizza:bigint,progressivo:string,id3_numero_polizza:string,id3_cod_compagnia:string>

Here is my code:

 var radice_polizza = spark
      .read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("table" -> "radice_polizza", "keyspace" -> "preferred_temp"))
      .load().select(
      "id_cod_polizza",
      "progressivo",
      "id3_numero_polizza",
      "id3_cod_compagnia")

if(mode == LoadMode.DIFF){
   val altered_data_df = altered_data.idCodPolizzaList.toDF("id_cod_polizza")
   radice_polizza = altered_data_df.join(radice_polizza, Seq("id_cod_polizza"))
   radice_polizza.explain()
}

Forcing Direct Join it works.

radice_polizza = altered_data_df.join(radice_polizza.directJoin(AlwaysOn), Seq("id_cod_polizza"))
== Physical Plan ==
*Project [id_cod_polizza#58L, progressivo#11, id3_numero_polizza#25, id3_cod_compagnia#21]
+- DSE Direct Join [id_cod_polizza = id_cod_polizza#58L] preferred_temp.radice_polizza - Reading (id_cod_polizza, progressivo, id3_numero_polizza, id3_cod_compagnia) Pushed {}
   +- *Project [value#56L AS id_cod_polizza#58L]
      +- *SerializeFromObject [input[0, bigint, false] AS value#56L]
         +- Scan ExternalRDDScan[obj#55L]

Why Direct Join is not used automatically?

Thnak you

1

There are 1 answers

2
Alex Ott On

DSE Direct Join is enabled automatically when you're developing application using DSE Analytics dependencies that are provided when you run your job on DSE Analytics. You need to specify following dependency for that, and don't use Spark Cassandra Connector:

    <dependency>
      <groupId>com.datastax.dse</groupId>
      <artifactId>dse-spark-dependencies</artifactId>
      <version>${dse.version}</version>
      <scope>provided</scope>
    </dependency>

if you run your job on external Spark, then you need to explicitly enable direct join by specifying Spark configuration property spark.sql.extensions with value of com.datastax.spark.connector.CassandraSparkExtensions.

I have a long blog post on the joining data with Cassandra that explains all this things.