Scala - Convert Cloud Event to Avro Format

90 views Asked by At

I'm working on a project where I need to read data from a data source and encapsulate it into an Avro schema following the CloudEvent standard using Cloudevents Java SDK. I'm struggling to find documentation or examples on how to convert a CloudEvent object to Avro.

I read my data from a data source using Spark

val newDataDF = inputDF.withColumn("cloudEventValue", createCloudEventUDF(to_json(struct(inputDF.columns.map(col): _*)))

    private def createCloudEventUDF: UserDefinedFunction = udf {
  (newData: String) =>
    val event: CloudEvent = CloudEventBuilder.v1()
      .withId("000")
      .withType("example.demo")
      .withSource(URI.create("http://example.com"))
      .withDataContentType("application/json")
      .withTime(OffsetDateTime.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()), ZoneOffset.UTC))
      .withData(newData.getBytes())
      .build()

//I return event.toString, but it's not result I was hoping for. Returning just "event" causes error because Spark has not encoder for CloudEvent class. //

Now, I need to write 'newDataDF' to a Kafka topic, but the 'cloudEventValue' column should be in Avro format

I've searched through the Cloudevents-Scala documentation, but I couldn't find clear guidance on how to convert a CloudEvent object to Avro. Can someone provide insights or examples on how to achieve this conversion? Any help or pointers to relevant documentation would be greatly appreciated.

0

There are 0 answers