UPD: The question is not valid anymore as it turned out two of the 100 tables had several orders of magnitude more rows than the rest (which had 500). When "bad" tables are eliminated, the join is distributed fairly and completes in predictable time.
I have about 100 Spark DataFrames, <=500 rows each, but roughly same size (planning to have tens of thousands of rows later). The ids of the entries of all of the columns are subsets of ids of the first (reference) table.
I want to left outer join all of the tables to the first one by id. I do it as follows (in pyspark):
df1.join(df2, df2.id == df1.id, 'left_outer')
.join(df3, df3.id == df1.id, 'left_outer')
...
This join operation generates 200 jobs, all of which but a few finish in couple of seconds. The last job, however takes extremely long (an hour or so) and runs (obviously) only on one processor. The spark web UI reveals that this job has acquired too many shuffle records.
Why is this happening and how is it better to tune Spark to avoid this?
The query "explain select * from ... left outer join ... ... ..." looks as follows:
== Physical Plan ==
Project [id#0, ... rest of the columns (~205) ...]
HashOuterJoin [id#0], [id#370], LeftOuter, None
HashOuterJoin [id#0], [id#367], LeftOuter, None
HashOuterJoin [id#0], [id#364], LeftOuter, None
...
Exchange (HashPartitioning [id#364], 200)
Project [...cols...]
PhysicalRDD [...cols...], MapPartitionsRDD[183] at map at newParquet.scala:542
Exchange (HashPartitioning [id#367], 200)
Project [...cols...]
PhysicalRDD [..cols...], MapPartitionsRDD[185] at map at newParquet.scala:542
Exchange (HashPartitioning [id#370], 200)
Project [...cols...]
PhysicalRDD [...cols...], MapPartitionsRDD[187] at map at newParquet.scala:542
Using repartition after join may help.
I experienced similar situations. Join two dfs with 200 partitions, and join again again, and it never ends.
I tried to add repartition(50) to DFs which will be joined, then it worked.