I am new to pyspark, so hope someone can help. I am trying to read parquet files stored on a GCP bucket. The file are partitioned by date so for example bucket-name/year={}/month={}/day={}
For a given file we have the below schema description:
- Until March we used to have columns x and y in float data type
- Since March those 2 columns are now in double data type
From what I can see pyspark has no issues in assessing float and double data types are being compatible data types. (similar examples I found online for this error were related to the data types being incompatible i.e. String and float for example) However we are facing this weird issue where if we try to read in all the available data for this file:
#i.e. read all the data we have ever received for this file
path = 'bucket-name/year=*/month=*/day=*'
df = spark.read.format('parquet').load(path)
df.cache().count()
we get the below error.
(please note that we don't get this error if we do df.count()
, only faced if we cache first)
Adding to that the resulting schema from the spark.read mentions the data type of column x as float. So schema wise, spark is happy to read in the data and says the dtype is float. However, if we cache, things go bad.
Hope the details of the situation are clear enough :)
An error occurred while calling o923.count. :
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 15 in stage 41.0 failed 4 times, most recent failure: Lost task
15.3 in stage 41.0 (TID 13228, avroconversion-validation-w-1.c.vf-gned-nwp-live.internal, executor
47): java.lang.UnsupportedOperationException:
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
at
org.apache.parquet.column.Dictionary.decodeToFloat(Dictionary.java:53)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToFloat(ParquetDictionary.java:41)
at
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getFloat(OnHeapColumnVector.java:423)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
at
org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$1.hasNext(InMemoryRelation.scala:125)
at
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1165)
at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:359)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1165)
at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123) at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
according to documentation:
cache()
is a lazy operation and if you look atMEMORY_ONLY
section you notice thatcache()
tries to Store RDD/DataFrame as deserialized Java objects in the JVM [once you call an action on cached RDD/DataFrame] so you have a problem in deserialization of objects in your RDD/DataFrame. I suggest try to perform some transformation likemap()
to check serialization/deserialization works well or notif you call
df.count()
without any transformation indf
spark didn't deserialize your objects