Unable to get the correct schema using schema registry

312 views Asked by At

I am using the from_avro function in PySpark to read data from Kafka in the Avro format and utilizing a schema registered within a schema registry. However, I am encountering an issue where the schema registry does not appropriately account for schema changes during batch processing. Consequently, it consistently utilizes the latest schema when the streaming job commences, but it fails to consider any schema changes that might occur in between streaming jobs. In an ideal scenario, the schema registry should take into account the schema ID specified in the first 5 bytes to ensure accurate schema resolution.

data_df = (
    spark.readStream.format("kafka")
    .option("kafka.ssl.endpoint.identification.algorithm", "")
    .option("kafka.security.protocol", "SSL")
    .option("kafka.bootstrap.servers", servers_details)
    .option("kafka.ssl.truststore.location", location)
    .option("kafka.ssl.truststore.password", pwd)
    .option("startingOffsets", "latest")
    .option("failOnDataLoss", "false")
    .option("maxOffsetsPerTrigger", 30)
    .option("subscribe", name)
    .load()
)

transform_df = (
    df.withColumn(
        "record",
        from_avro(
            fn.col("value"),
            schemaRegistryAddress="http://schema-registry.com",
            subject=f"{topic_name}-value",
        ),
    )
    .withColumn("schema_id", function_convert(fn.expr("substring(value, 2, 4)")))
    .select("schema_id", fn.col("record"))
)
display(transform_df)

I tried options from from_avro but it seems its not working

transform_df = df.withColumn(
    "record",
    from_avro(
        fn.col("value"),
        options={"confluent.value.schema.validation": "true"},
        schemaRegistryAddress="http://schema-registry.com",
        subject=f"{topic_name}-value",
    ),
).select(fn.col("record").alias("RECORD_CONTENT"))

1

There are 1 answers

1
connecttopawan On

It looks like you're using the from_avro function in PySpark to read Avro data from Kafka and facing issues with schema changes not being considered during batch processing. Unfortunately, the from_avro function in PySpark doesn't directly provide an option to specify the schema ID for schema resolution using the first 5 bytes. The confluent.value.schema.validation option you tried doesn't address this specific scenario.

However, you can work around this issue by manually resolving the Avro schema using the schema registry and the schema ID present in the first 5 bytes of the Avro data. Here's a possible approach to achieve this:

import io
import requests
from avro.schema import AvroSchema
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

# Function to fetch Avro schema from schema registry based on schema ID
def fetch_schema_from_registry(schema_registry_url, subject, schema_id):
    url = f"{schema_registry_url}/subjects/{subject}/versions/{schema_id}"
    response = requests.get(url)
    response.raise_for_status()
    return response.json()["schema"]

# Function to parse Avro data using the schema from the schema registry
def parse_avro_data(avro_data, schema_registry_url, subject):
    schema_id = int.from_bytes(avro_data[:5], "big")
    avro_schema_str = fetch_schema_from_registry(schema_registry_url, subject, schema_id)
    avro_schema = AvroSchema(avro_schema_str)
    return avro_schema.decode(io.BytesIO(avro_data[5:]))

# UDF to parse Avro data and apply schema resolution
parse_avro_udf = fn.udf(lambda value: parse_avro_data(value, "http://schema-registry.com", f"{topic_name}-value"))

data_df = (
    spark.readStream.format("kafka")
    # ... other Kafka options ...
    .load()
)

transform_df = data_df.withColumn("record", parse_avro_udf(fn.col("value")))

# Continue with your processing on `transform_df` as needed

In this approach, we define a UDF (parse_avro_udf) that takes the Avro data, fetches the corresponding schema from the schema registry using the schema ID present in the first 5 bytes, and then parses the Avro data using the resolved schema. The fetch_schema_from_registry function is responsible for fetching the schema from the schema registry based on the schema ID.

By using this UDF, you should be able to handle schema changes during batch processing, as it will resolve the correct schema for each record based on the schema ID present in the Avro data.