Apache Flink creates incorrect plan

622 views Asked by At

I created a simple Job for Apache Flink that uses the PageRank implementation provided with Gelly.

Locally, running inside the IDE, everything is fine. However, I tried to submit a JAR with my Job to a Flink instance running in my machine, using the JobManager web interface. But, instead of getting the correct plan for the Job and executing PageRank, Flink presents and executes a very strange plan that only counts the number of vertices of the graph.

I did some research and debugging, and found out that the implementation of PageRank provided with Gelly starts calculating the number of vertices of the graph, when it's not provided as a parameter to the algorithm:

if (numberOfVertices == 0) {
    numberOfVertices = network.numberOfVertices();
}

This calculation implies an embedded job. As the operators are lazy, no computation is triggered. In the Flink server, the first thing done is to obtain the job plan. This is done by a special environment, OptimizerPlanEnvironment, that provides the following result method:

public JobExecutionResult execute(String jobName) throws Exception {
    Plan plan = createProgramPlan(jobName);
    this.optimizerPlan = compiler.compile(plan);

    // do not go on with anything now!
    throw new ProgramAbortException();
}

The issue comes from here. As soon as the ProgramAbortException is thrown, the program returns the plan calculated so far. But only the inside job plan has been computed, so this way the main job plan is never computed or executed.

This is the code I used:

public class Job {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        Graph<Long, Double, Double> graph = Graph.fromDataSet(
            PageRankData.getDefaultEdgeDataSet(env), new VertexInit(), env);
        graph.run(new PageRank<Long>(0.85, 10)).print();
    }

    private static class VertexInit implements MapFunction<Long, Double> {
        @Override
        public Double map(Long value) throws Exception { return 1.0; }
    }
}

If the number of vertices is provided, doing e.g. graph.run(new PageRank<Long>(0.85, 5, 10)), there is no problem, the plan is correctly computed and PageRank is calculated.

My question is: what am I doing wrong? Or is this some actual bug in Flink?

1

There are 1 answers

2
Till Rohrmann On BEST ANSWER

The problem is, as you've stated, that network.numberOfVertices internally calls count on the vertex dataset. This triggers an independent Flink job which calculates the count value. This value will normally be retrieved by the main method. However, in the case of a web client submission this won't work, because of the OptimizerPlanEnvironment, which only allows to compile a single Flink job. The behaviour is similar to the detached execution mode which also does not support an eager plan execution.

This is a limitation of Flink's web client at the moment. The reason for this behaviour is that Flink does not want to block a Netty channel handler thread which would be necessary to wait for the result of the count operation. A blocking operation would starve the thread pool and make the web interface for this session unresponsive until it is unblocked.