How do I make my many-join / many-union datasets compute faster?

510 views Asked by At

I have a series of ~30 datasets that all need to be joined together for making a wide final table. This final table takes ~5 years of individual tables (one table per year) and unions them together, then joins this full history with the full history of other tables (similarly unioned) to make a big, historical, wide table.

The layout of these first, per year tables is as such:

table_type_1:

| primary_key | year |
|-------------|------|
| key_1       | 0    |
| key_2       | 0    |
| key_3       | 0    |

With other year tables like this:

table_type_1:

| primary_key | year |
|-------------|------|
| key_1       | 1    |
| key_2       | 1    |

These are then unioned together to create:

table_type_1:

| primary_key | year |
|-------------|------|
| key_1       | 0    |
| key_2       | 0    |
| key_3       | 0    |
| key_1       | 1    |
| key_2       | 1    |

Similarly, a second type of table when unioned results in the following:

table_type_2:

| primary_key | year |
|-------------|------|
| key_1       | 0    |
| key_2       | 0    |
| key_3       | 0    |
| key_1       | 1    |
| key_2       | 1    |

I now want to join table_type_1 with table_type_2 on primary_key and year to yield a much wider table. I notice that this final join takes a very long time and shuffles a lot of data.

How can I make this faster?

2

There are 2 answers

0
itIsNaz On

What I advice you is: to make a first union on small datasets then to broadcast the dataset ,result of the first union , spark will deploy that dataset on its different nodes which will reduce the number of shuffles. The union on spark is well optimized so what you have to do is to think about the possess : select only columns that you need from the beginning, avoid any kind of non cost effective operations before the union like groupByKey ...etc because spark will call those operations when it makes the final process. I do advise you to avoid hive because it uses the map reduce strategy which is not worthy compared to spark sql you can use this example of a function just change the key, use scala if you can it will interact directly with spark:

def map_To_cells(df1: DataFrame, df2: DataFrame): DataFrame = {
val df0= df2.withColumn("key0",F.col("key")).drop("key")
df1.as("main").join(
broadcast(df0),
df0("key0") <=> df("key")
).select( needed columns)
}  
0
vanhooser On

You can use bucketing on the per-year tables over the primary_key and year columns into the exact same number of buckets to avoid an expensive exchange when computing the final join.

- output: table_type_1_year_0
  input: raw_table_type_1_year_0
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
- output: table_type_1_year_1
  input: raw_table_type_1_year_1
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
...
- output: table_type_2_year_0
  input: raw_table_type_2_year_0
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
- output: table_type_2_year_1
  input: raw_table_type_2_year_1
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
...
- output: all_tables
  input:
    - table_type_1_year_0
    - table_type_1_year_1
...
    - table_type_2_year_0
    - table_type_2_year_1
...
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)

Note: When you are picking the BUCKET_COUNT value, it's important to understand it should be optimized for the final all_tables output, not for the intermediate tables. This will mean you likely will end up with files that are quite small for the intermediate tables. This is likely to be inconsequential compared to the efficiency gains of the all_tables output since you won't have to compute a massive exchange when joining everything up; your buckets will be pre-computed and you can simply SortMergeJoin on the input files.

For an explicit example on how to write the transform writing out a specified number of buckets, my answer over here is probably useful.