Skipping a Bad Record in Python Spark Pyspark Databricks Unknown Field Exception

165 views Asked by At

I am wondering if someone might know how to skip a record that we are getting from a json file

Here is the error

[UNKNOWN_FIELD_EXCEPTION.NEW_FIELDS_IN_RECORD_WITH_FILE_PATH] Encountered unknown fields during parsing: Here is the code that is failing

sent = spark.readStream.format('cloudFiles') \
  .option('cloudFiles.format', 'json') \
  .option('multiline', 'true') \
  .option('cloudFiles.inferColumnTypes', 'true') \
  .option('cloudFiles.schemaLocation', checkpoint_path) \
  .load(raw_files) \
  .withColumn('load_ts', F.current_timestamp()) \
  .writeStream \
  .format('delta') \
  .option('checkpointLocation', checkpoint_path) \
  .trigger(availableNow=True) \
  .option('mergeSchema', 'true') \
  .toTable(b_write_path)

Thanks!

I have not seen any documentation on how to fix this error.

1

There are 1 answers

1
Alex Ott On BEST ANSWER

This depends on what do you want to do with that data. By default, Databricks Autoloader uses the addNewColumns mode, which fails stream when encounters the new columns, but after restart it will handle them correctly.

You can use either rescue or none as schema evolution mode, like this.

.option("cloudFiles.schemaEvolutionMode", "rescue")

In rescue mode, data for new columns will be put into a so-called "rescue column" which you can analyze if necessary, and the process won't fail.

In none mode, new columns are ignored and the process won't fail.

See docs for more details.