use collect() and env.execute() in one flink job

1.5k views Asked by At

I'm trying to write a computation in Flink which requires two phases.

In the first phase, I create a Graph and get its vertex ids:

List<String> ids = graph.getVertexIds().collect();

In the second phase, I'd like to use these ids to run SingleSourceShortestPath for each vertex.

for (String id: ids){
        System.out.println("Source Id: "+id);
        graph.run( new SingleSourceShortestPaths<String, String>(id, 10)).print();
    }

It works locally (in IntelliJ IDE and command lines using ./bin/flink run ...), but when I submit the job on Flink using its WebUI, the program just execute till collect() method and does not run the remaining of the program (for statement and print()).

What is the problem?

Here is my code:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.library.SingleSourceShortestPaths;

import java.util.ArrayList;
import java.util.List;

public class Main {
    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        Edge<String, Double> e1 = new Edge<String, Double>("1", "2", 0.5);
        Edge<String, Double> e2 = new Edge<String, Double>("2", "3", 0.5);
        Edge<String, Double> e3 = new Edge<String, Double>("4", "5", 0.5);
        Edge<String, Double> e4 = new Edge<String, Double>("5", "6", 0.5);
        Edge<String, Double> e5 = new Edge<String, Double>("7", "8", 0.5);


        List<Edge<String, Double>> edgeList = new ArrayList<Edge<String, Double>>();
        edgeList.add(e1);
        edgeList.add(e2);
        edgeList.add(e3);
        edgeList.add(e4);
        edgeList.add(e5);


        Graph<String, String, Double> graph = Graph.fromCollection(edgeList,
                new MapFunction<String, String>() {
                    public String map(String value) {
                        return value;
                    }
                }, env);

        List<String> ids = graph.getVertexIds().collect();

        for (String id: ids){
            System.out.println("Source Id: "+id);
            graph.run( new SingleSourceShortestPaths<String, String>(id, 10)).print();
        }
    }
}
1

There are 1 answers

0
Fariba Hashemi On BEST ANSWER

Based on this link, Flink transformations are lazy, meaning that they are not executed until a sink operation is invoked.

A sink operation in Flink triggers the execution of a stream to produce the desired result of the program, such as saving the result to the file system or printing it to the standard output

The methods such as Dataset.collect(), Dataset.Count() and Dataset.print() are sink operations that trigger the actual data transformations.