How to resolve NPE at com.esotericsoftware.kryo.Kryo.readObject using with Apache Flink?

1.4k views Asked by At

I am using Flink and custom kryo class for my pojo class . But getting

Caused by: java.lang.NullPointerException
    at MyTreeSerializer.read(MyTreeSerializer.java:36)
    at MyTreeSerializer.read(MyTreeSerializer.java:11)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:414)
    ... 16 more

Here are details -

  1. Kryo 2.24.0

  2. My Pojo Class

```

public class MyTree extends TreeMap<String, Object> {
    private String id;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
}
  1. Serializer For Pojo

```

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.MapSerializer;


public class MyTreeSerializer extends Serializer<MyTree> {


    public MyTreeSerializer() {
    }


    @Override
    public void write(Kryo kryo, Output output, MyTree object) {
        output.writeString(object.getId());
        kryo.writeObject(output, object, new MapSerializer());

    }

    @Override
    public MyTree read(Kryo kryo, Input input, Class<MyTree> type) {
        String id = input.readString();
        System.out.println("Serialized Id " + id);
        MyTree myTree = kryo.readObject(input, type, new MapSerializer());
        System.out.println("Serialized Object " + myTree);
        myTree.setId(id);
        return myTree;
    }
}

```

  1. Flink Streaming Main Program

```

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;


public class MultiSinkTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // Setting Serializer
        env.getConfig().addDefaultKryoSerializer(MyTree.class, MyTreeSerializer.class);

        DataStreamSource<String> data = env.fromElements("1", "2");

        DataStream<MyTree> returns = data.map(new MapFunction<String, MyTree>() {
            @Override
            public MyTree map(String s) throws Exception {
                MyTree myTree = new MyTree();
                myTree.setId(s);
                myTree.put("name", "sohi");
                return myTree;
            }
        }).returns(MyTree.class);


        returns.addSink(new SinkFunction<MyTree>() {
            @Override
            public void invoke(MyTree myTree) throws Exception {
                System.out.println("==> " + myTree.toString());
            }
        });

        env.execute();
    }
}
By using all code mentioned only id is getting serialized not the map part of MyTree .

But If I replace

env.getConfig().addDefaultKryoSerializer(MyTree.class, MyTreeSerializer.class);

with

env.getConfig().addDefaultKryoSerializer(MyTree.class, MapSerializer.class);

then id is not serialized but map is getting serialized .

Just need help why it is not working when using MyTreeSerializer.class.

Thanks in advance .

1

There are 1 answers

0
nille85 On

The following line in MyTreeSerializer results in a null:

MyTree myTree = kryo.readObject(input, type, new MapSerializer());

Thats also why myTree.setId(id) results in a

NullPointerException.

When you use MapSerializer, it works fine (except the deserialization of id of course) because MyTree extends from a TreeMap which implements a Map.

In your implementation of MyTreeSerializer, you are trying to deserialize a member of class MyTree from a MyTree Object. It's like the MyTreeSerializer expects an object like in the sample code below:


    public class MyTree extends TreeMap {
        private String id;
        private MyTree myTree;

        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }

        public MyTree getMyTree() {
            return myTree;
        }

        public void setMyTree(MyTree myTree) {
            this.myTree = myTree;
        } 
    }

It think you will need to look at the MapSerializer and extend from it or use it as a base for your own implementation in order to serialize and deserialize MyTree objects.