one job takes extremely long on multiple left join in Spark-SQL (1.3.1)

2.5k views Asked by At

UPD: The question is not valid anymore as it turned out two of the 100 tables had several orders of magnitude more rows than the rest (which had 500). When "bad" tables are eliminated, the join is distributed fairly and completes in predictable time.


I have about 100 Spark DataFrames, <=500 rows each, but roughly same size (planning to have tens of thousands of rows later). The ids of the entries of all of the columns are subsets of ids of the first (reference) table.

I want to left outer join all of the tables to the first one by id. I do it as follows (in pyspark):

df1.join(df2, df2.id == df1.id, 'left_outer')
   .join(df3, df3.id == df1.id, 'left_outer')
   ...

This join operation generates 200 jobs, all of which but a few finish in couple of seconds. The last job, however takes extremely long (an hour or so) and runs (obviously) only on one processor. The spark web UI reveals that this job has acquired too many shuffle records.

Why is this happening and how is it better to tune Spark to avoid this?


enter image description here


The query "explain select * from ... left outer join ... ... ..." looks as follows:

== Physical Plan ==
Project [id#0, ... rest of the columns (~205) ...]
 HashOuterJoin [id#0], [id#370], LeftOuter, None
  HashOuterJoin [id#0], [id#367], LeftOuter, None
   HashOuterJoin [id#0], [id#364], LeftOuter, None
    ...
   Exchange (HashPartitioning [id#364], 200)
    Project [...cols...]
     PhysicalRDD [...cols...], MapPartitionsRDD[183] at map at newParquet.scala:542
  Exchange (HashPartitioning [id#367], 200)
   Project [...cols...]
    PhysicalRDD [..cols...], MapPartitionsRDD[185] at map at newParquet.scala:542
 Exchange (HashPartitioning [id#370], 200)
  Project [...cols...]
   PhysicalRDD [...cols...], MapPartitionsRDD[187] at map at newParquet.scala:542
1

There are 1 answers

1
hyim On

Using repartition after join may help.

I experienced similar situations. Join two dfs with 200 partitions, and join again again, and it never ends.

I tried to add repartition(50) to DFs which will be joined, then it worked.