I'm working on a project where I need to read data from a data source and encapsulate it into an Avro schema following the CloudEvent standard using Cloudevents Java SDK. I'm struggling to find documentation or examples on how to convert a CloudEvent object to Avro.
I read my data from a data source using Spark
val newDataDF = inputDF.withColumn("cloudEventValue", createCloudEventUDF(to_json(struct(inputDF.columns.map(col): _*)))
private def createCloudEventUDF: UserDefinedFunction = udf {
(newData: String) =>
val event: CloudEvent = CloudEventBuilder.v1()
.withId("000")
.withType("example.demo")
.withSource(URI.create("http://example.com"))
.withDataContentType("application/json")
.withTime(OffsetDateTime.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()), ZoneOffset.UTC))
.withData(newData.getBytes())
.build()
//I return event.toString, but it's not result I was hoping for. Returning just "event" causes error because Spark has not encoder for CloudEvent class. //
Now, I need to write 'newDataDF' to a Kafka topic, but the 'cloudEventValue' column should be in Avro format
I've searched through the Cloudevents-Scala documentation, but I couldn't find clear guidance on how to convert a CloudEvent object to Avro. Can someone provide insights or examples on how to achieve this conversion? Any help or pointers to relevant documentation would be greatly appreciated.