I create a sparkSession with Pyspark version 3.5.0 in the following way:
spark = SparkSession.builder \
.appName("KafkaStreamingExample") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.spark:spark-avro_2.12:3.5.0") \
.getOrCreate()
If I do a read config, it shows, that the class is loaded:
print(spark.conf.get("spark.jars.packages"))
org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.spark:spark-avro_2.12:3.5.0
Read a stream from Kafka:
df_customers = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:29092")
.option("subscribe", "postgres.public.customers")
.option("startingOffsets", "latest")
.load()
)
And can see the output as binaries:
from IPython.display import display, clear_output
from time import sleep
while True:
clear_output(wait=True)
display(query.status)
display(spark.sql('SELECT * from customers_query order by offset desc').show())
sleep(10)
Example:
{'message': 'Processing new data',
'isDataAvailable': True,
'isTriggerActive': True}
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
|[00 00 00 00 01 4...|[00 00 00 00 02 0...|postgres.public.c...| 0| 17051|2023-12-08 09:57:...| 0|
|[00 00 00 00 01 4...|[00 00 00 00 02 0...|postgres.public.c...| 0| 17050|2023-12-08 09:57:...| 0|
|[00 00 00 00 01 4...|[00 00 00 00 02 0...|postgres.public.c...| 0| 17049|2023-12-08 09:57:...| 0|
|[00 00 00 00 01 4...|[00 00 00 00 02 0...|postgres.public.c...| 0| 17048|2023-12-08 09:57:...| 0|
|[00 00 00 00 01 4...|[00 00 00 00 02 0...|postgres.public.c...| 0| 17047|2023-12-08 09:57:...| 0|
|[00 00 00 00 01 4...|[00 00 00 00 02 0...|postgres.public.c...| 0| 17046|2023-12-08 09:57:...| 0|
But when I try to decode the values using from_avro it fails with the message:
Py4JError: An error occurred while calling z:org.apache.spark.sql.avro.functions.from_avro. Trace:
py4j.Py4JException: Method from_avro([class org.apache.spark.sql.Column, class java.util.HashMap, class java.util.HashMap]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:276)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:833)
I don't understand why. I call the function from_avro in the following way:
Read the schema:
topic = "postgres.public.customers-value"
#SCHEMA_REGISTRY_URL = "http://172.22.0.4:8081"
SCHEMA_REGISTRY_URL = "http://schema-registry:8081"
print("SCHEMA_REGISTRY_URL: ", SCHEMA_REGISTRY_URL)
URL = SCHEMA_REGISTRY_URL + '/subjects/' + topic + '/versions/latest/schema'
r = requests.get(url=URL)
schema = r.json()
Deserialise:
while True:
clear_output(wait=True)
display(query.status)
display(spark.sql(from_avro(data=col("value"), jsonFormatSchema=schema)).show())
sleep(10)
Even a simple version output = df_customers.select(from_avro("value", schema))
is not finding the function. I don't understand why.