I am trying to read data from a kafka topic which has an avro schema. I have generated POJO from my avro schema which i am using it to deserialize it. At least that what i understood from reading Stream Processing with Apache Flink from Fabian Hueske :"POJOs, including classes generated by Apache Avro" can be used.
What i see in the output is that some fields default to generic records. Usually it happens for DateTime fields Below the error:
13:50:30,736 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Field Class#name will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
I don't understand why is it falling to Generic Type where i would like POJO to be used.
What am i doing wrong ? Or have i understood it wrong ?
I also read the documentation about Serializers and TypeInformation but still im not able to make sense why my approach is not working.
Below is my code
KafkaSource<MyClas> kafkaSource = KafkaSource.<MyClas>builder()
.setTopics(topic66)
.setGroupId("flink-group")
.setBootstrapServers("localhost:9092")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(new MyClasDeserializer(MyClas.class,SCHEMA_REGISTRY_URL))
.build();
DataStream<MyClas> dataStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),"reading Data")
.returns(TypeInformation.of(MyClas.class));
And code for deserializing:
public class MyClassDeserializer implements KafkaRecordDeserializationSchema<MyClass> {
private final TypeInformation<MyClass> typeInformation;
private final DeserializationSchema<MyClass> deserializationSchemaValue;
public MyClassDeserializer(final Class<MyClass> trackClass,final String schemaRegistryUrl) {
this.typeInformation = TypeInformation.of(trackClass);
this.deserializationSchemaValue = ConfluentRegistryAvroDeserializationSchema.forSpecific(trackClass,schemaRegistryUrl);
}
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<MyClass> collector) throws IOException {
try {
collector.collect(deserializationSchemaValue.deserialize(consumerRecord.value()));
}
catch (IOException e ) {
System.out.println(" deserializng");
}
}
@Override
public TypeInformation<MyClass> getProducedType() {
return typeInformation;
}
How can i deserialize using POJO or Avro.
Even for what looks like a valid POJO, Flink will fall back to Kryo if any field in the POJO can't be serialized as a POJO. I believe this is the case for many of the date-time classes.
You could force Flink to use the Avro serializer via
env.getConfig().enableForceAvro(). Or you could implement a custom serializer for your type.