I am looking to build a Spark Streaming application using the DataFrames API on Spark 1.6. Before I get too far down the rabbit hole, I was hoping someone could help me understand how DataFrames deals with data having a different schema.
The idea is that messages will flow into Kafka with an Avro schema. We should be able to evolve the schema in backwards compatible ways without having to restart the streaming application (the application logic will still work).
It appears trivial to deserialize new versions of messages using a schema registry and the schema id embedded in the message using the KafkaUtils to create a direct stream and the AvroKafkaDecoder (from Confluent). That gets me as far as having a DStream.
Problem #1: Within that DStream there will be objects with different versions of the schema. So when I translate each one into a Row object I should be passing in a reader schema that is the latest one to properly migrate the data, and I need to pass the latest schema into the sqlContext.createDataFrame(rowRdd, schema) call. The objects in the DStream are of type GenericData.Record, and as far as I can tell there is no easy way to tell which is the most recent version. I see 2 possible solutions, one is to call the schema registry to get the latest version of the schema on every microbatch. The other is to modify the decoder to attach the schema id. I could then iterate over the rdd to find the highest id and get the schema from a local cache.
I was hoping someone had already solved this nicely in a reusable way.
Problem/Question #2: Spark is going to have a different executor pulling from Kafka for each partition. What happens to my application when one executor receives a different "latest" schema than the others. The DataFrame created by one executor will have a different schema than another for the same time window. I don't actually know if this is a real problem or not. I am having trouble visualizing the flow of data, and what kinds of operations would present problems. If it is a problem, it would imply that there needs to be some data sharing between executors and that sounds both complicated and inefficient.
Do I need to worry about this? If I do, how to I resolve the schema differences?
Thanks, --Ben
I believe I have resolved this. I am using Confluent's schema registry and KafkaAvroDecoder. Simplified code looks like: