I am developing a Kafka Streams application consuming a topic with JSON Schema events. Following code works fine to consume the data properly according to documentation:
...
serdeConfig.put(KafkaJsonSchemaDeserializerConfig.JSON_VALUE_TYPE, "com.fasterxml.jackson.databind.JsonNode");
final Serde<JsonNode> jsonSerde = new KafkaJsonSchemaSerde<>();
jsonSerde.configure(serdeConfig, false);
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, jsonSerde))
.foreach((k, v) -> {
System.out.println(v.toString());
});
Now I do have a generic object JsonNode, similar to GenericRecord I would have in case of AVRO. What I need now is the schema ID of the consumed records. This information has to exist somewhere, since it is required to successfully parse the records (which has been done).
Where do I find this information?
I've tried using JsonSchemaUtils without success. Either of those functions returns junk, so I assume this is not the correct way:
JsonSchemaUtils.getSchema(v).name()
JsonSchemaUtils.getSchema(v).toString()
JsonSchemaUtils.getSchema(v).metadata().toString()
JsonSchemaUtils.getSchema(v).canonicalString()
JsonSchemaUtils.getSchema(v).rawSchema().getId()
The JsonNode class does not offer any possibility to retrieve a potential schema and it's also not part of the payload (in form of an envelope e.g.)