I try to write a bigquery partitioned and clustered table using dbt python. I am using pyspark dataproc serverless because I don't want to handle the maintenance of the cluster (some of my teammates are not data engineer so it helps too). I have two dataframes one is a huge external tables of events in parquet format (I don't know the size since I don't have access to this information but I have file source Partition File Cache accessible to improve the query performance) and the other one is a reference table also in parquet format stored in GCS. There is a lot of transformations done in the first dataframes and then I have to join with the second one. When I run my code with 14 days of historic, no problem the table is created. It is around 700 millions rows. But now I need to create the same table with 13 months of historic and the job failed. My first question is due to the estimate size of the final table is this a good idea to store the data in a bigquery table? Second question what would be the best approach considering this recurring warning/errors in the output during the job run and before it fails (this is just an extract of all the output):
24/01/25 15:44:00 WARN BlockManagerMasterEndpoint: Error trying to remove broadcast 1 from block manager BlockManagerId(22, 10.132.0.11, 36775, None)
java.io.IOException: Connection from /10.132.0.11:55488 closed
at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
24/01/25 15:44:00 ERROR TransportRequestHandler: Error sending result RpcResponse[requestId=7593386952163727134,body=NioManagedBuffer[buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]]] to /10.132.0.11:55488; closing connection
io.netty.channel.StacklessClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)
24/01/25 15:44:00 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /10.132.0.112:58248 is closed
24/01/25 15:44:00 WARN BlockManagerMasterEndpoint: Error trying to remove broadcast 1 from block manager BlockManagerId(30, 10.132.0.112, 36095, None)
java.io.IOException: Connection from /10.132.0.112:58248 closed
at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
24/01/25 16:27:11 WARN TaskSetManager: Lost task 77.0 in stage 9.0 (TID 4186) (10.132.0.10 executor 53): FetchFailed(BlockManagerId(51, 10.132.0.15, 40611, None), shuffleId=1, mapIndex=190, mapId=4099, reduceId=77, message=
org.apache.spark.shuffle.FetchFailedException
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: java.util.concurrent.TimeoutException: Waited 30000 milliseconds (plus 174630 nanoseconds delay) for SettableFuture@28b5248b[status=PENDING]
24/01/25 19:42:07 WARN TaskSetManager: Lost task 16.0 in stage 9.3 (TID 5435) (10.132.0.150 executor 42): FetchFailed(BlockManagerId(8, 10.132.0.59, 45669, None), shuffleId=1, mapIndex=40, mapId=3949, reduceId=37, message=
org.apache.spark.shuffle.FetchFailedException
at org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1330)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1034)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:87)
at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1505)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: java.util.concurrent.TimeoutException: Waited 30000 milliseconds (plus 114 milliseconds, 386561 nanoseconds delay) for SettableFuture@458acde2[status=PENDING]
at org.sparkproject.guava.base.Throwables.propagate(Throwables.java:242)
I understand the problem mainly come from the join between the two tables and I don't know to how optimize it. I try to broadcast the biggest table but it didn't help. Here is the spark configurations I used:
spark.reducer.fetchMigratedShuffle.enabled: 'true'
spark.dynamicAllocation.executorAllocationRatio: '1'
spark.sql.sources.partitionOverwriteMode: dynamic
spark.sql.hive.filesourcePartitionFileCacheSize: '1000000000'
spark.sql.debug.maxToStringFields: '500'
spark.sql.legacy.timeParserPolicy: 'LEGACY'
spark.sql.parquet.compression.codec: 'snappy'
spark.io.compression.codec: 'snappy'
spark.sql.autoBroadcastJoinThreshold: '-1'
and here some of my code
import pyspark.sql.functions as F
from pyspark.sql.window import Window
def model(dbt, spark):
#Session Configurations
dbt.config(
materialized = 'table',
cluster_by = ['col_2', 'col_1'],
partition_by = {
'field': 'event_day',
'data_type': 'date',
'granularity': 'day'
}
)
df = spark.read.option('mergeSchema', 'true').parquet('file_events')\
.filter(
(F.col('event_day') >= F.date_add(F.current_timestamp(), F.lit(-14))))
df_ref = spark.read.parquet('file_ref')
df_results = df.join(F.broadcast(df_ref), 'col_1').groupby('col_2', *df_results.columns[:]).agg(events)
return df_results
Thanks for your help !