Flink Avro Serialization shows "not serializable" error when working with GenericRecords

3.6k views Asked by At

I'm really having a hard time making Flink to communicate properly with a running Kafka instance making use of an Avro schema from the Confluent Schema Registry (for both key and value).

After a while of thinking and restructuring my programm, I was able to push my implementation so far:

Producer Method

    public static FlinkKafkaProducer<Tuple2<GenericRecord,GenericRecord>> kafkaAvroGenericProducer() {  
        final Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "--.-.-.--:9092");
        properties.put("schema.registry.url", "http://--.-.-.---:8081");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KVSerializationSchema.class.getName()); //wrong class should not matter
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KVSerializationSchema.class.getName()); //wrong class but should not matter


        return new FlinkKafkaProducer<Tuple2<GenericRecord,GenericRecord>>("flink_output",
                new GenericSerializer("flink_output", schemaK, schemaV, "http://--.-.-.---:8081"),
                properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

    }

GenericSerializer.java

package com.reeeliance.flink;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import flinkfix.ConfluentRegistryAvroSerializationSchema;

public class GenericSerializer implements KafkaSerializationSchema<Tuple2<GenericRecord,GenericRecord>>{

    private String topic;   
    private Schema schemaKey;
    private Schema schemaValue;
    private String registryUrl;

    public GenericSerializer(String topic, Schema schemaK, Schema schemaV, String url) {
        super();
        this.topic = topic;
        this.schemaKey = schemaK;
        this.schemaValue = schemaV;
        this.registryUrl = url;
    }

    public GenericSerializer() {
        super();
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(Tuple2<GenericRecord,GenericRecord> element, Long timestamp) {
        byte[] key = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-key", schemaKey, registryUrl).serialize(element.f0);
        byte[] value = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-value", schemaValue, registryUrl).serialize(element.f1);

        return new ProducerRecord<byte[], byte[]>(topic, key, value);
    }

}

However, when I execute the Job, it fails in the preparation phase, without the Job actually running with following error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [H_EQUNR type:STRING pos:0] is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:617)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:571)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:547)
    at com.reeeliance.flink.StreamingJob.kafkaAvroGenericProducer(StreamingJob.java:257)
    at com.reeeliance.flink.StreamingJob.main(StreamingJob.java:84)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
    - custom writeObject data (class "java.util.ArrayList")
    - root object (class "org.apache.avro.Schema$LockableArrayList", [H_EQUNR type:STRING pos:0])
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at java.util.ArrayList.writeObject(ArrayList.java:766)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
    ... 8 more

I know all classes have to implement Serializable-Interface or to be made transient, but I don't use my own classes and the error does not address a function, which is not serializable (as usual threads deal with), but rather a record or field. The field comes from the key schema, a schema containing only this one field. I assume my error lies somewhere in using GenericRecord, which does not implement Serializable-Interface, but I see GenericRecord being used for this kind of Serialization a lot, so it doesn't really make sense to me.

The class ConfluentRegistryAvroSerializationSchema is taken from GitHub, as it is not yet included in the current Flink version (1.9.1) we are using. I included the necessary classes and changed classes and I don't think this might be the reason for my problem. (Issue solved)

Can anybody help me debug this? I would also appreciate a lot, if you could show me a different way to achieve the same goal, the incompatibility of Flink Avro and Confluent Schema Registry has been driving me crazy so far.

3

There are 3 answers

8
Dawid Wysakowicz On BEST ANSWER

The exception message tells you which class is not serializable.

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field

The problem lies in the Schema class which you store in the fields of you GenericSerializer.

You could try this:

public class GenericSerializer implements KafkaSerializationSchema<Tuple2<GenericRecord,GenericRecord>>{

    private final SerializationSchema<GenericRecord> valueDeserializer;
    private final SerializationSchema<GenericRecord> keyDeserializer;

    public GenericSerializer(String topic, Schema schemaK, Schema schemaV, String url) {
        this.keyDeserializer = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-key", schemaKey, registryUrl);
        this.valueDeserializer = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-value", schemaValue, registryUrl); 
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(Tuple2<GenericRecord,GenericRecord> element, Long timestamp) {
        byte[] key = keySerializer.serialize(element.f0);
        byte[] value = valueSerializer.serialize(element.f1);

        return new ProducerRecord<byte[], byte[]>(topic, key, value);
    }

}

The ConfluentRegistryAvroSerializationSchema is serializable therefore you can safely store it in a field in your GenericSerializer.

It will also be more performant as the underlying structures will not be reinstantiated for every incoming record.

3
gem_freak On

was there any conclusion about the issue that Flink is falling back to kryo for avro generic record?

I'm using scala and added the typeinformation like this:

implicit val typeInformation: TypeInformation[GenericRecord] = TypeInformation.of( new TypeHint[GenericRecord] {
      new GenericRecordAvroTypeInfo(EventMessage.SCHEMA$)
    })

The stream is setup like this:

DataStream[GenericRecord]

But Flink Runtime still falls back to kryo because it cannot recognise the Avro Generic Record and treats it as any generic type.

0
Chanpreet Chhabra On

The problem is with the org.apache.avro.Schema$Field class. The class is not serializable which is resulting in this exception. The solution is mentioned in the flink documentation as well under the note section

Since Avro’s Schema class is not serializable, it can not be sent around as is. You can work around this by converting it to a String and parsing it back when needed. If you only do this once on initialization, there is practically no difference to sending it directly.

so we need to make the parsing with each message we are getting, we can't do it only once in constructor, it will be same as passing into the constructor itself.

So the solution can be like the following snippet. We will be accepting the avro schema as string in constructor and will be creating avro schema in the serialize method.

class AvroMessageSerializationSchema(topic: String, schemaString: String, schemaRegistry: String) extends KafkaSerializationSchema[GenericRecord] {

  private def getSchema(schema: String): Schema = {
    new Schema.Parser().parse(schema)
  }

  override def serialize(element: GenericRecord, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
    val schema = getSchema(schemaString)
    val value = ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, schemaRegistry).serialize(element)
    new ProducerRecord[Array[Byte], Array[Byte]](topic, value)
  }
}

One more thing, we need to keep in mind is to provide typeInformation which the flink will require to serialize avro else it will fallback to kyro for serialization.

implicit val typeInformation: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(avroSchema)

avro serialization with flink