Delta Live Table ignoring the defined schema

363 views Asked by At

Using autoloader, I am reading some continues data from storage to Databricks Delta Live table. The declaration of data pipeline is as follows.

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
    
sch = "StructType([StructField('Date', StringType(), True), StructField('machine', StringType(), True), StructField('temperature', DecimalType(), True), StructField('time', StringType(), True)])" 
    
    @dlt.create_table(
      comment="The raw machine data, ingested from azure storage.",
      table_properties={
        "myCompanyPipeline.quality": "raw",
        "pipelines.autoOptimize.managed": "true"
      }
    )
    
    def test_raw():
      return (spark.readStream.format("cloudFiles").option("schema",sch).option("cloudFiles.schemaLocation", "/FileStore/schema").option("cloudFiles.format", "json").load("..../"))

And dataset I am reading from storage as below.

{"Date":"2023-10-16","time":"12:00:00","machine":"Machine1","temperature":"23.50"}
{"Date":"2023-10-16","time":"12:00:01","machine":"Machine2","temperature":"...corrupt temp..."}
{"Date":"2023-10-16","time":"12:00:02","machine":"Machine3","temperature":"27.50"}

But unfortunately, the pipeline is not failing for wrong "temperature" data (Non Decimal) and pipeline is processing all records successfully. Ideally this should get failed because temperature column is defined as Decimal data type.

Can someone please help, why this schema enforcement not working.

2

There are 2 answers

0
abhijit nag On BEST ANSWER

The problem has been resolved after applying

spark.readStream.format("cloudFiles").schema(sch)

in place of

spark.readStream.format("cloudFiles").option("schema",sch)
2
JayashankarGS On

But unfortunately, the pipeline is not failing for wrong "temperature" data (Non Decimal) and pipeline is processing all records successfully. Ideally this should get failed because temperature column is defined as Decimal data type.

You won't get error whenever there is mismatch between schema and data type, it simply makes it has null when there is type mismatch.

Note: It makes null only when you are running pipeline first time. If you already having table with column of different type and provided schema is of different type then you will get error as below.

enter image description here

For json file type it takes everything as string if you don't provide the schema properly.

That is you provided schema in option("schema",sch) instead of schema(sch)

enter image description here

So, you won't get any error and it takes everything as string.