pyspark json read to tag bad records

798 views Asked by At

I want to use pyspark to parse files with json data and would like to tag 'bad/unexpected' records. By 'Bad/unexpected records' i mean those which do not follow the schema i specify. I have this input file and want to specify schema . It works when data is in the expected format as per schema. (inp1.json) It does not work when data is not in the correct format in the input file. (inp2.json) In that case, It just reads entire file/dataframe as null. What i want is to just treat that one record as corrupt record and read rest of 3 rows. Any suggestion please.

inp1.json  (data in correct format)


[{"last_name": ["ln1", ""], "city": ["c1", "c2"]},
{"last_name": ["ln3", "ln4"], "city": ["c10", "c20"]},
{"last_name": ["ln2"], "city": ["c1", "c2"]}]

from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType, DoubleType
myschema = StructType([

     StructField('city', ArrayType(StringType(), True), True),
     StructField('last_name', ArrayType(StringType(), True), True)
 ])

sc = SparkContext(appName=app)


inp_file="inp1.json"
spark = SparkSession.builder.appName("read_json").config("spark.some.config.option","some-value").enableHiveSupport().getOrCreate()
raw_df = spark.read.json(inp_file,multiLine=True, schema=myschema)
print "raw_df"
raw_df.show(truncate=False)


raw_df
+----------+----------+
|city      |last_name |
+----------+----------+
|[c1, c2]  |[ln1, ]   |
|[c10, c20]|[ln3, ln4]|
|[c1, c2]  |[ln2]     |
+----------+----------+

Sample run for data with bad record

inp2.json  (data in in correct format, please note that last_name in the last record is not an array, but just a string)

[{"last_name": ["ln1", ""], "city": ["c1", "c2"]},
{"last_name": ["ln3", "ln4"], "city": ["c10", "c20"]},
{"last_name": ["ln2"], "city": ["c1", "c2"]},{"last_name": "ln4", "city": ["c4", "c5"]}]


raw_df
+----+---------+
|city|last_name|
+----+---------+
|null|null     |
+----+---------+
1

There are 1 answers

1
Manoj Singh On

You can specify the mode=DROPMALFORMED option when reading the json.

raw_df = spark.read.option('mode','DROPMALFORMED').json(inp_file,multiLine=True, schema=myschema)

https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/sql/DataFrameReader.html#json-scala.collection.Seq-