How do I read a Large JSON Array File in PySpark

5k views Asked by At

Issue

I recently encountered a challenge in Azure Data Lake Analytics when I attempted to read in a Large UTF-8 JSON Array file and switched to HDInsight PySpark (v2.x, not 3) to process the file. The file is ~110G and has ~150m JSON Objects.

HDInsight PySpark does not appear to support Array of JSON file format for input, so I'm stuck. Also, I have "many" such files with different schemas in each containing hundred of columns each, so creating the schemas for those is not an option at this point.

Question

How do I use out-of-the-box functionality in PySpark 2 on HDInsight to enable these files to be read as JSON?

Thanks,

J

Things I tried

I used the approach at the bottom of this page: from Databricks that supplied the below code snippet:

import json

df = sc.wholeTextFiles('/tmp/*.json').flatMap(lambda x: json.loads(x[1])).toDF()
display(df)

I tried the above, not understanding how "wholeTextFiles" works, and of course ran into OutOfMemory errors that killed my executors quickly.

I attempted loading to an RDD and other open methods, but PySpark appears to support only the JSONLines JSON file format, and I have the Array of JSON Objects due to ADLA's requirement for that file format.

I tried reading in as a text file, stripping Array characters, splitting on the JSON object boundaries and converting to JSON like the above, but that kept giving errors about being unable to convert unicode and/or str (ings).

I found a way through the above, and converted to a dataframe containing one column with Rows of strings that were the JSON Objects. However, I did not find a way to output only the JSON Strings from the data frame rows to an output file by themselves. The always came out as

{'dfColumnName':'{...json_string_as_value}'}

I also tried a map function that accepted the above rows, parsed as JSON, extracted the values (JSON I wanted), then parsed the values as JSON. This appeared to work, but when I would try to save, the RDD was type PipelineRDD and had no saveAsTextFile() method. I then tried the toJSON method, but kept getting errors about "found no valid JSON Object", which I did not understand admittedly, and of course other conversion errors.

1

There are 1 answers

0
jatal On BEST ANSWER

I finally found a way forward. I learned that I could read json directly from an RDD, including a PipelineRDD. I found a way to remove the unicode byte order header, wrapping array square brackets, split the JSON Objects based on a fortunate delimiter, and have a distributed dataset for more efficient processing. The output dataframe now had columns named after the JSON elements, inferred the schema, and dynamically adapts for other file formats.

Here is the code - hope it helps!:

#...Spark considers arrays of Json objects to be an invalid format
#    and unicode files are prefixed with a byteorder marker
#
thanksMoiraRDD = sc.textFile( '/a/valid/file/path', partitions ).map(
    lambda x: x.encode('utf-8','ignore').strip(u",\r\n[]\ufeff") 
)

df = sqlContext.read.json(thanksMoiraRDD)