spark-sql from_avro does not exist

37 views Asked by At

I create a sparkSession with Pyspark version 3.5.0 in the following way:

spark = SparkSession.builder \
    .appName("KafkaStreamingExample") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.spark:spark-avro_2.12:3.5.0") \
    .getOrCreate()

If I do a read config, it shows, that the class is loaded:

print(spark.conf.get("spark.jars.packages"))
org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.spark:spark-avro_2.12:3.5.0

Read a stream from Kafka:

df_customers = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "broker:29092")
  .option("subscribe", "postgres.public.customers")
  .option("startingOffsets", "latest")
  .load()
)

And can see the output as binaries:

from IPython.display import display, clear_output
from time import sleep

while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT * from customers_query order by offset desc').show())
    sleep(10)

Example:

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
|                 key|               value|               topic|partition|offset|           timestamp|timestampType|
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
|[00 00 00 00 01 4...|[00 00 00 00 02 0...|postgres.public.c...|        0| 17051|2023-12-08 09:57:...|            0|
|[00 00 00 00 01 4...|[00 00 00 00 02 0...|postgres.public.c...|        0| 17050|2023-12-08 09:57:...|            0|
|[00 00 00 00 01 4...|[00 00 00 00 02 0...|postgres.public.c...|        0| 17049|2023-12-08 09:57:...|            0|
|[00 00 00 00 01 4...|[00 00 00 00 02 0...|postgres.public.c...|        0| 17048|2023-12-08 09:57:...|            0|
|[00 00 00 00 01 4...|[00 00 00 00 02 0...|postgres.public.c...|        0| 17047|2023-12-08 09:57:...|            0|
|[00 00 00 00 01 4...|[00 00 00 00 02 0...|postgres.public.c...|        0| 17046|2023-12-08 09:57:...|            0|

But when I try to decode the values using from_avro it fails with the message:


Py4JError: An error occurred while calling z:org.apache.spark.sql.avro.functions.from_avro. Trace:
py4j.Py4JException: Method from_avro([class org.apache.spark.sql.Column, class java.util.HashMap, class java.util.HashMap]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
    at py4j.Gateway.invoke(Gateway.java:276)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:833)

I don't understand why. I call the function from_avro in the following way:

Read the schema:

topic = "postgres.public.customers-value"
#SCHEMA_REGISTRY_URL = "http://172.22.0.4:8081"
SCHEMA_REGISTRY_URL = "http://schema-registry:8081"
print("SCHEMA_REGISTRY_URL: ", SCHEMA_REGISTRY_URL)
URL = SCHEMA_REGISTRY_URL + '/subjects/' + topic + '/versions/latest/schema'
r = requests.get(url=URL)
schema = r.json()

Deserialise:

while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql(from_avro(data=col("value"), jsonFormatSchema=schema)).show())
    sleep(10)

Even a simple version output = df_customers.select(from_avro("value", schema))is not finding the function. I don't understand why.

0

There are 0 answers