I am new to pyspark, so hope someone can help. I am trying to read parquet files stored on a GCP bucket. The file are partitioned by date so for example bucket-name/year={}/month={}/day={}

For a given file we have the below schema description:

  1. Until March we used to have columns x and y in float data type
  2. Since March those 2 columns are now in double data type

From what I can see pyspark has no issues in assessing float and double data types are being compatible data types. (similar examples I found online for this error were related to the data types being incompatible i.e. String and float for example) However we are facing this weird issue where if we try to read in all the available data for this file:

#i.e. read all the data we have ever received for this file
 path = 'bucket-name/year=*/month=*/day=*' 

df = spark.read.format('parquet').load(path)
df.cache().count()

we get the below error. (please note that we don't get this error if we do df.count(), only faced if we cache first)

Adding to that the resulting schema from the spark.read mentions the data type of column x as float. So schema wise, spark is happy to read in the data and says the dtype is float. However, if we cache, things go bad.

Hope the details of the situation are clear enough :)

An error occurred while calling o923.count. :
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 15 in stage 41.0 failed 4 times, most recent failure: Lost task
15.3 in stage 41.0 (TID 13228, avroconversion-validation-w-1.c.vf-gned-nwp-live.internal, executor
47): java.lang.UnsupportedOperationException:
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
    at
org.apache.parquet.column.Dictionary.decodeToFloat(Dictionary.java:53)
    at
org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToFloat(ParquetDictionary.java:41)
    at
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getFloat(OnHeapColumnVector.java:423)
    at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)     at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
    at
org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$1.hasNext(InMemoryRelation.scala:125)
    at
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
    at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1165)
    at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)     at
org.apache.spark.rdd.RDD.iterator(RDD.scala:308)    at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:359)
    at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1165)
    at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)     at
org.apache.spark.rdd.RDD.iterator(RDD.scala:308)    at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)  at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

error_snippet

1

There are 1 answers

1
badger On

according to documentation:

The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory)

cache() is a lazy operation and if you look at MEMORY_ONLY section you notice that cache() tries to Store RDD/DataFrame as deserialized Java objects in the JVM [once you call an action on cached RDD/DataFrame] so you have a problem in deserialization of objects in your RDD/DataFrame. I suggest try to perform some transformation like map() to check serialization/deserialization works well or not

if you call df.count() without any transformation in df spark didn't deserialize your objects