For the following scrap of code:
case class SomeRow(key: String, value: String)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val ds1 = Seq(SomeRow("A", "1")).toDS().repartition(col("key"))
val ds2 = Seq(SomeRow("A", "1"), SomeRow("B","2")).toDS().repartition(col("key"))
val dataSetJoined = ds1.joinWith(ds2, ds1("key")===ds2("key"), "left")
val dataFrameJoined = ds1.join(ds2, ds1("key")===ds2("key"), "left")
dataSetJoined.explain(true)
dataFrameJoined.explain(true)
Spark generates following plan for dataSet:
== Physical Plan ==
SortMergeJoin [_1#132.key], [_2#133.key], LeftOuter
:- *(2) Sort [_1#132.key ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(_1#132.key, 2)
: +- *(1) Project [named_struct(key, key#122, value, value#123) AS _1#132]
: +- Exchange hashpartitioning(key#122, 2)
: +- LocalTableScan [key#122, value#123]
+- *(4) Sort [_2#133.key ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(_2#133.key, 2)
+- *(3) Project [named_struct(key, key#128, value, value#129) AS _2#133]
+- Exchange hashpartitioning(key#128, 2)
+- LocalTableScan [key#128, value#129]
and for dataFrame:
== Physical Plan ==
SortMergeJoin [key#122], [key#128], LeftOuter
:- *(1) Sort [key#122 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key#122, 2)
: +- LocalTableScan [key#122, value#123]
+- *(2) Sort [key#128 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key#128, 2)
+- LocalTableScan [key#128, value#129]
Is it possible to avoid another same exchanging when joining two dataSets using joinWith?