Spark: error reading DateType columns in partitioned parquet data

30.7k views Asked by At

I have parquet data in S3 partitioned by nyc_date in the format s3://mybucket/mykey/nyc_date=Y-m-d/*.gz.parquet.

I have a DateType column event_date that for some reason throws this error when I try to read from S3 and write to hdfs using EMR.

from pyspark.sql import SparkSession

spark = SparkSession.builder.enableHiveSupport().getOrCreate()
df = spark.read.parquet('s3a://mybucket/mykey/') 

df.limit(100).write.parquet('hdfs:///output/', compression='gzip')

Error:

java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
    at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:48)
    at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:233)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Here's what I figured out:

  • Local works :-): I copied over some data locally in the same format and can query fine.
  • Avoid selecting event_date works :-): Selecting all 50+ columns but for event_date doesn't cause any errors.
  • Explicit read path throws error :-(: Changing the read path to 's3a://mybucket/mykey/*/*.gz.parquet' still throws error.
  • Specifying schema still throws error :-(: specifying the schema before loading still causes the same error.
  • I can load the data including eastern_date into a data warehouse :-).

Really weird this causes an error only for a DateType column. I don't have any other DateType columns.

Using Spark 2.0.2 and EMR 5.2.0.

3

There are 3 answers

2
Kamil Sindi On BEST ANSWER

I just used StringType instead of DateType when writing parquet. Don't have the issue anymore.

0
maholt On

I know I'm late to the party...

I had a similar issue. I read several parquet dirs, unioned them, and tried to write.

My fix was to add a .select(...) before the write.

0
Vlad.Bachurin On

I had this exception when Spark was reading the Parquet file generated from JSON file.

TLDR: If possible, re-write the input Parquet with the expected schema forcefully applied.

Scala code below. Python won't be too different.

This is pretty much how my Parquet generation looked like at first:

spark.read
  .format("json")
  .load("<path-to-json-file>.json")
  .write
  .parquet("<path-to-output-directory>")

But the Spark job which would read the above Parquet was enforcing the schema on the input. About like this:

val structType: StructType = StructType(fields = Seq(...))
spark.read.schema(structType) 

And above is where the exception basically occurs.

FIX: In order to fix the exception I had to forcefully apply the schema to the data I generated:

spark.read
  .schema(structType) // <===
  .format("json")
  .load("<path-to-json-file>.json")
  .write
  .parquet("<path-to-output-directory>")

To my understanding, the reason for the exception in my case was not (only) the String-Type->DateType conversion, like for @kamil-sindi.

But also the fact that when reading JSON, Spark assigns LongType to all numeric values. Thus, my Parquet would be saved with LongType fields.

And the Spark job reading that Parquet, presumably, struggled to convert LongType to IntegerType.