Handling schema changes in running Spark Streaming application

5k views Asked by At

I am looking to build a Spark Streaming application using the DataFrames API on Spark 1.6. Before I get too far down the rabbit hole, I was hoping someone could help me understand how DataFrames deals with data having a different schema.

The idea is that messages will flow into Kafka with an Avro schema. We should be able to evolve the schema in backwards compatible ways without having to restart the streaming application (the application logic will still work).

It appears trivial to deserialize new versions of messages using a schema registry and the schema id embedded in the message using the KafkaUtils to create a direct stream and the AvroKafkaDecoder (from Confluent). That gets me as far as having a DStream.

Problem #1: Within that DStream there will be objects with different versions of the schema. So when I translate each one into a Row object I should be passing in a reader schema that is the latest one to properly migrate the data, and I need to pass the latest schema into the sqlContext.createDataFrame(rowRdd, schema) call. The objects in the DStream are of type GenericData.Record, and as far as I can tell there is no easy way to tell which is the most recent version. I see 2 possible solutions, one is to call the schema registry to get the latest version of the schema on every microbatch. The other is to modify the decoder to attach the schema id. I could then iterate over the rdd to find the highest id and get the schema from a local cache.

I was hoping someone had already solved this nicely in a reusable way.

Problem/Question #2: Spark is going to have a different executor pulling from Kafka for each partition. What happens to my application when one executor receives a different "latest" schema than the others. The DataFrame created by one executor will have a different schema than another for the same time window. I don't actually know if this is a real problem or not. I am having trouble visualizing the flow of data, and what kinds of operations would present problems. If it is a problem, it would imply that there needs to be some data sharing between executors and that sounds both complicated and inefficient.

Do I need to worry about this? If I do, how to I resolve the schema differences?

Thanks, --Ben

2

There are 2 answers

2
Ben On

I believe I have resolved this. I am using Confluent's schema registry and KafkaAvroDecoder. Simplified code looks like:

// Get the latest schema here. This schema will be used inside the
// closure below to ensure that all executors are using the same 
// version for this time slice.
val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000)
val m = sr.getLatestSchemaMetadata(subject)
val schemaId = m.getId
val schemaString = m.getSchema

val outRdd = rdd.mapPartitions(partitions => {
  // Note: we cannot use the schema registry from above because this code
  // will execute on remote machines, requiring the schema registry to be
  // serialized. We could use a pool of these.
  val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000)
  val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry)
  val parser = new Schema.Parser()
  val avroSchema = parser.parse(schemaString)
  val avroRecordConverter = AvroSchemaConverter.createConverterToSQL(avroSchema)

  partitions.map(input => {
    // Decode the message using the latest version of the schema.
    // This will apply Avro's standard schema evolution rules 
    // (for compatible schemas) to migrate the message to the 
    // latest version of the schema.
    val record = decoder.fromBytes(messageBytes, avroSchema).asInstanceOf[GenericData.Record]
    // Convert record into a DataFrame with columns according to the schema
    avroRecordConverter(record).asInstanceOf[Row]
  })
})

// Get a Spark StructType representation of the schema to apply 
// to the DataFrame.
val sparkSchema = AvroSchemaConverter.toSqlType(
      new Schema.Parser().parse(schemaString)
    ).dataType.asInstanceOf[StructType]
sqlContext.createDataFrame(outRdd, sparkSchema)
0
Brian On

I accomplished this using only structured streaming.

case class DeserializedFromKafkaRecord(value: String)
 
val brokers = "...:9092"
val schemaRegistryURL = "...:8081"
var topicRead = "mytopic"
 
 
val kafkaParams = Map[String, String](
  "kafka.bootstrap.servers" -> brokers,
  "group.id" -> "structured-kafka",
  "failOnDataLoss"-> "false",
  "schema.registry.url" -> schemaRegistryURL
)
    
object topicDeserializerWrapper {
  val props = new Properties()
  props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL)
  props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
  val vProps = new kafka.utils.VerifiableProperties(props)
  val deser = new KafkaAvroDecoder(vProps)
  val avro_schema = new RestService(schemaRegistryURL).getLatestVersion(topicRead + "-value")
  val messageSchema = new Schema.Parser().parse(avro_schema.getSchema)
}
 
val df = {spark
  .readStream
  .format("kafka")
  .option("subscribe", topicRead)
  .option("kafka.bootstrap.servers", brokers)
  .option("auto.offset.reset", "latest")
  .option("failOnDataLoss", false)
  .option("startingOffsets", "latest")
  .load()
  .map(x => {
     DeserializedFromKafkaRecord(DeserializerWrapper.deser.fromBytes(x.getAs[Array[Byte]]("value"), DeserializerWrapper.messageSchema).asInstanceOf[GenericData.Record].toString)
  })}