Cannot print Kafka Avro decoded message

1.8k views Asked by At

I have a legacy C++ based system which spits out binary encoded Avro data that supports confluent Avro schema registry format. In my Java application, I successfully deserialized the message using KafkaAvroDeserializer class but could not print out the message.

private void consumeAvroData(){
    String group = "group1";
    Properties props = new Properties();
    props.put("bootstrap.servers", "http://1.2.3.4:9092");
    props.put("group.id", group);
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", LongDeserializer.class.getName());
    props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
   // props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,"false");
    props.put("schema.registry.url","http://1.2.3.4:8081");
    KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);

    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    System.out.println("Subscribed to topic " + TOPIC_NAME);

    while (true) {
        ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
        for (ConsumerRecord<String, GenericRecord> record : records)
        {
            System.out.printf("value = %s\n",record.value());
        }
    }
}

The output I get is

{"value":"�"}

Why is that I cannot print the deserialized data ? Any help appreciated !

1

There are 1 answers

3
Hans Jespersen On

The wire format for the Confluent Avro Serializer is documented here in the section entitled "Wire Format"

http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html

It's a single magic byte (currently always 0) followed by a 4 byte Schema ID as returned by the Schema Registry, followed by a set of bytes which are the Avro serialized data in Avro’s binary encoding.

If you read the message as a ByteArray and print out the first 5 bytes you will know if this is really a Confluent Avro Serialized message or not. Should be 0 followed by 0001 or some other Schema ID which you can check if it is in the Schema Registry for this topic.

If it's not in this format then the message is likely serialized another way (without Confluent Schema Registry) and you need to use a different deserializer or perhaps extract the full Schema from the message value or even need to get the original Schema file from some other source to be able to decode.