Apache Flink read Avro byte[] from Kafka

2.8k views Asked by At

In reviewing examples I see alot of this:

FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);

I see that they here already know the schema.

I do not know the schema until I read the byte[] into a Generic Record then get the schema. (As it may change from record to record)

Can someone point me into a FlinkKafkaConsumer08 that reads from byte[] into a map filter so that I can remove some leading bits, then load that byte[] into a Generic Record ?

2

There are 2 answers

1
Dave Torok On BEST ANSWER

I'm doing something similar (I'm using the 09 consumer)

In your main code pass in your custom deserializer:

FlinkKafkaConsumer09<Object> kafkaConsumer = new FlinkKafkaConsumer09<>(
                parameterTool.getRequired("topic"), new MyDeserializationSchema<>(),
                parameterTool.getProperties());

The custom Deserialization Schema reads the bytes, figures out the schema and/or retrieves it from a schema registry, deserializes into a GenericRecord and returns the GenericRecord object.

public class MyDeserializationSchema<T> implements DeserializationSchema<T> {


    private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class;

    @Override
    public T deserialize(byte[] arg0) throws IOException {
        //do your stuff here, strip off your bytes
        //deserialize and create your GenericRecord 
        return (T) (myavroevent);
    }

    @Override
    public boolean isEndOfStream(T nextElement) {
        return false;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(avrotype);
    }

}
3
Svend On

If you use Confluent's schema registry, I believe a preferred solution would be to use the Avro serde provided by Confluent. This way, we just call deserialize() and the resolution of the latest version of the Avro schema to use is done automatically behind the scene and no byte manipulation is required.

It boils down to something like this (example code in scala, a java solution would be very similar):

import io.confluent.kafka.serializers.KafkaAvroDeserializer

...

val valueDeserializer = new KafkaAvroDeserializer()
valueDeserializer.configure(
  Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava, 
  false)

...

override def deserialize(messageKey: Array[Byte], message: Array[Byte], 
                       topic: String, partition: Int, offset: Long): KafkaKV = {

    val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[GenericRecord]
    val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord]

    KafkaKV(key, value)
    }

...

This method requires that the message producer is also integrated with the schema registry and publishes the schema there. This can be done in a very similar way as above, using Confluent's KafkaAvroSerializer

I posted a detailed explanation here: How to integrate Flink with Confluent's schema registry