how to optimize the join of two dataframes in pyspark using dataproc serverless

53 views Asked by At

I try to write a bigquery partitioned and clustered table using dbt python. I am using pyspark dataproc serverless because I don't want to handle the maintenance of the cluster (some of my teammates are not data engineer so it helps too). I have two dataframes one is a huge external tables of events in parquet format (I don't know the size since I don't have access to this information but I have file source Partition File Cache accessible to improve the query performance) and the other one is a reference table also in parquet format stored in GCS. There is a lot of transformations done in the first dataframes and then I have to join with the second one. When I run my code with 14 days of historic, no problem the table is created. It is around 700 millions rows. But now I need to create the same table with 13 months of historic and the job failed. My first question is due to the estimate size of the final table is this a good idea to store the data in a bigquery table? Second question what would be the best approach considering this recurring warning/errors in the output during the job run and before it fails (this is just an extract of all the output):

24/01/25 15:44:00 WARN BlockManagerMasterEndpoint: Error trying to remove broadcast 1 from block manager BlockManagerId(22, 10.132.0.11, 36775, None)
java.io.IOException: Connection from /10.132.0.11:55488 closed
    at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
    at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
24/01/25 15:44:00 ERROR TransportRequestHandler: Error sending result RpcResponse[requestId=7593386952163727134,body=NioManagedBuffer[buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]]] to /10.132.0.11:55488; closing connection
io.netty.channel.StacklessClosedChannelException
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)
24/01/25 15:44:00 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /10.132.0.112:58248 is closed
24/01/25 15:44:00 WARN BlockManagerMasterEndpoint: Error trying to remove broadcast 1 from block manager BlockManagerId(30, 10.132.0.112, 36095, None)
java.io.IOException: Connection from /10.132.0.112:58248 closed
    at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
    at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
24/01/25 16:27:11 WARN TaskSetManager: Lost task 77.0 in stage 9.0 (TID 4186) (10.132.0.10 executor 53): FetchFailed(BlockManagerId(51, 10.132.0.15, 40611, None), shuffleId=1, mapIndex=190, mapId=4099, reduceId=77, message=
org.apache.spark.shuffle.FetchFailedException
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: java.util.concurrent.TimeoutException: Waited 30000 milliseconds (plus 174630 nanoseconds delay) for SettableFuture@28b5248b[status=PENDING]
24/01/25 19:42:07 WARN TaskSetManager: Lost task 16.0 in stage 9.3 (TID 5435) (10.132.0.150 executor 42): FetchFailed(BlockManagerId(8, 10.132.0.59, 45669, None), shuffleId=1, mapIndex=40, mapId=3949, reduceId=37, message=
org.apache.spark.shuffle.FetchFailedException
    at org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1330)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1034)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:87)
    at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
    at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1505)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: java.util.concurrent.TimeoutException: Waited 30000 milliseconds (plus 114 milliseconds, 386561 nanoseconds delay) for SettableFuture@458acde2[status=PENDING]
    at org.sparkproject.guava.base.Throwables.propagate(Throwables.java:242)

I understand the problem mainly come from the join between the two tables and I don't know to how optimize it. I try to broadcast the biggest table but it didn't help. Here is the spark configurations I used:

spark.reducer.fetchMigratedShuffle.enabled: 'true'
            spark.dynamicAllocation.executorAllocationRatio: '1'
            spark.sql.sources.partitionOverwriteMode: dynamic
            spark.sql.hive.filesourcePartitionFileCacheSize: '1000000000'
            spark.sql.debug.maxToStringFields: '500'
            spark.sql.legacy.timeParserPolicy: 'LEGACY'
            spark.sql.parquet.compression.codec: 'snappy'
            spark.io.compression.codec: 'snappy'
            spark.sql.autoBroadcastJoinThreshold: '-1'

and here some of my code

import pyspark.sql.functions as F
from pyspark.sql.window import Window

def model(dbt, spark):

    #Session Configurations

    dbt.config(
        materialized = 'table',
        cluster_by = ['col_2', 'col_1'],
        partition_by = {
            'field': 'event_day',
            'data_type': 'date',
            'granularity': 'day'
        }
    )

    df = spark.read.option('mergeSchema', 'true').parquet('file_events')\
            .filter(
                (F.col('event_day') >= F.date_add(F.current_timestamp(), F.lit(-14))))

    df_ref = spark.read.parquet('file_ref')

    df_results = df.join(F.broadcast(df_ref), 'col_1').groupby('col_2', *df_results.columns[:]).agg(events)

    return df_results

Thanks for your help !

0

There are 0 answers