Getting ClassNotFound Exception in Flink SourceFunction

906 views Asked by At

I'm using protocol buffer to send stream of data to Apache Flink. I have two classes. one is Producer and one is Consumer. Producer is a java thread class which reads the data from socket and Protobuf deserializes it and then I store it in my BlockingQueue Consumer is a class which implements SourceFunction in Flink. I tested this program with using:

DataStream<Event.MyEvent> stream = env.fromCollection(queue);

instead of custom source and it works fine. But when I try to use a SourceFunction class it throws this exception:

Caused by: java.lang.RuntimeException: Unable to find proto buffer class
at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
...
Caused by: java.lang.ClassNotFoundException: event.Event$MyEvent
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
...

And in another attempt I mixed both classed into one (the class which implements SourceFunction). I get data from socket and deserialize it with protobuf and store it in BlockingQueue and then I read from BlockingQueue right after that. My code works fine with this approach too.

But I want to use two separate classes (multi-threading) but it throws that exception. I'm trying to solve it in last 2 days and also did lots of searching but no luck. Any help would be apperciated.

Producer:

public class Producer implements Runnable {

    Boolean running = true;
    Socket socket = null, bufferSocket = null;
    PrintStream ps = null;
    BlockingQueue<Event.MyEvent> queue;
    final int port;

    public Producer(BlockingQueue<Event.MyEvent> queue, int port){
        this.port = port;
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            socket = new Socket("127.0.0.1", port);
            bufferSocket = new Socket(InetAddress.getLocalHost(), 6060);
            ps = new PrintStream(bufferSocket.getOutputStream());
            while (running) {
                queue.put(Event.MyEvent.parseDelimitedFrom(socket.getInputStream()));
                ps.println("Items in Queue: " + queue.size());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

}

Consumer:

public class Consumer implements SourceFunction<Event.MyEvent> {

    Boolean running = true;
    BlockingQueue<Event.MyEvent> queue;
    Event.MyEvent event;
    public Consumer(BlockingQueue<Event.MyEvent> queue){
        this.queue = queue;
    }

    @Override
    public void run(SourceContext<Event.MyEvent> sourceContext) {
        try {
            while (running) {
                event = queue.take();
                sourceContext.collect(event);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

Event.MyEvent is my protobuf class. I'm using version 2.6.1 and I compiled classes with v2.6.1 . I double checked the versions to be sure it's not the problem. The Producer class is working fine. I tested this with both Flink v1.1.3 and v1.1.4. I'm running it in local mode.


EDIT: Answer was included in question, posted it separately and removed it here.

UPDATE 12/28/2016

... But I'm still curious. What is causing this error? Is it a bug in Flink or am I doing something wrong?

...

1

There are 1 answers

0
Dennis Jaheruddin On

The asker already found a way to make this working. I have extracted the relevant part from the question. Note that the reason why it happened remains unexplained.

I did not use quote syntax as it is a lot of text, but the below was shared by the asker:

So finally I got it to work. I created my BlockingQueue object inside SourceFunction (Consumer), and called Producer class from inside the SourceFunction class (Consumer) instead of making BlockingQueue and calling Producer class in main method of the program. and it now works!

Here's my full working code in Flink:

public class Main {

public static void main(String[] args) throws Exception {

    final int port, buffer;
    //final String ip;
    try {
        final ParameterTool params = ParameterTool.fromArgs(args);
        port = params.getInt("p");
        buffer = params.getInt("b");
    } catch (Exception e) {
        System.err.println("No port number and/or buffer size specified.");
        return;
    }

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


    DataStream<Event.MyEvent> stream = env.addSource(new Consumer(port, buffer));
    //DataStream<Event.MyEvent> stream = env.fromCollection(queue);


    Pattern<Event.MyEvent, ?> crashedPattern = Pattern.<Event.MyEvent>begin("start")
            .where(new FilterFunction<Event.MyEvent>() {
                @Override
                public boolean filter(Event.MyEvent myEvent) throws Exception {
                    return (myEvent.getItems().getValue() >= 120);
                }
            })
            .<Event.MyEvent>followedBy("next").where(new FilterFunction<Event.MyEvent>() {
                @Override
                public boolean filter(Event.MyEvent myEvent) throws Exception {
                    return (myEvent.getItems().getValue() <= 10);
                }
            })
            .within(Time.seconds(3));

    PatternStream<Event.MyEvent> crashed = CEP.pattern(stream.keyBy(new KeySelector<Event.MyEvent, String>() {
        @Override
        public String getKey(Event.MyEvent myEvent) throws Exception {
            return myEvent.getEventType();
        }
    }), crashedPattern);

    DataStream<String> alarm = crashed.select(new PatternSelectFunction<Event.MyEvent, String>() {
        @Override
        public String select(Map<String, Event.MyEvent> pattern) throws Exception {
            Event.MyEvent start = pattern.get("start");
            Event.MyEvent next = pattern.get("next");
            return start.getEventType() + " | Speed from " + start.getItems().getValue() + " to " + next.getItems().getValue() + " in 3 seconds\n";
        }
    });

    DataStream<String> rate = alarm.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
            .apply(new AllWindowFunction<String, String, TimeWindow>() {
                @Override
                public void apply(TimeWindow timeWindow, Iterable<String> iterable, Collector<String> collector) throws Exception {
                    int sum = 0;
                    for (String s: iterable) {
                        sum ++;
                    }
                    collector.collect ("CEP Output Rate: " + sum + "\n");
                }
            });

    rate.writeToSocket(InetAddress.getLocalHost().getHostName(), 7070, new SimpleStringSchema());

    env.execute("Flink Taxi Crash Streaming");
}

private static class Producer implements Runnable {

    Boolean running = true;
    Socket socket = null, bufferSocket = null;
    PrintStream ps = null;
    BlockingQueue<Event.MyEvent> queue;
    final int port;

    Producer(BlockingQueue<Event.MyEvent> queue, int port){
        this.port = port;
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            socket = new Socket("127.0.0.1", port);
            bufferSocket = new Socket(InetAddress.getLocalHost(), 6060);
            ps = new PrintStream(bufferSocket.getOutputStream());
            while (running) {
                queue.put(Event.MyEvent.parseDelimitedFrom(socket.getInputStream()));
                ps.println("Items in Queue: " + queue.size());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

}

private static class Consumer implements SourceFunction<Event.MyEvent> {

    Boolean running = true;
    final int port;
    BlockingQueue<Event.MyEvent> queue;

    Consumer(int port, int buffer){
        queue = new ArrayBlockingQueue<>(buffer);
        this.port = port;
    }

    @Override
    public void run(SourceContext<Event.MyEvent> sourceContext) {
        try {
            new Thread(new Producer(queue, port)).start();
            while (running) {
                sourceContext.collect(queue.take());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}