Not getting response for query in flink queryable state [version-1.7.2]

282 views Asked by At

I am querying to proxy server of flink cluster which is on 127.0.1.1:9069 but not getting response for query. I am calculating sum of all inputted numbers by creating a server on 9000 port. Also I am storing the sum in Value State.

Flink Job:

private transient ValueState<Tuple2<String, Long>> sum;

@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<String,Long>> out) throws Exception {
    if (input.f1==-1){
        sum.clear();
        return;
    }
    Tuple2<String, Long> currentSum = sum.value();
    currentSum.f1 += input.f1;


    sum.update(currentSum);
    System.out.println("Current Sum: "+(sum.value().f1)+"\nCurrent Count: "+(sum.value().f0));
        out.collect(new Tuple2<>("sum", sum.value().f1));
}

@Override
public void open(Configuration config) {
    ValueStateDescriptor<Tuple2<String, Long>> descriptor =
            new ValueStateDescriptor<>(
                    "sum", // the state name
                    TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}),
                    Tuple2.of("sum", 0L)); // default value of the state, if nothing was set
    sum = getRuntimeContext().getState(descriptor);

}

inp.flatMap(new FlatMapFunction<String, Tuple2<Long, Long>>() {
        @Override
        public void flatMap(String inpstr, Collector<Tuple2<Long, Long>> out) throws Exception{

            for (String word : inpstr.split("\\s")) {
                try {
                    if(word.equals("quit")){
                        throw new QuitValueState( "Stoppping!!!",hostname,port);
                    }
                    if(word.equals("clear")){
                        word="-1";
                    }
                    out.collect(Tuple2.of(1L, Long.valueOf(word)));
                }
                catch ( NumberFormatException e) {
                    System.out.println("Enter valid number: "+e.getMessage());
                }catch (QuitValueState ex){
                    System.out.println("Quitting!!!");
                }
            }
        }
    }).keyBy(0).flatMap(new StreamingJob())
            .keyBy(0).asQueryableState("query-name");

On flink cluster I am able to see proxy server at 127.0.1.1:9069

Client side:

public static void main(String[] args) throws IOException, InterruptedException, Exception {
    QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069);

    System.out.println("Querying on "+args[0]);
    JobID jobId = JobID.fromHexString(args[0]);
    ValueStateDescriptor<Tuple2<String, Long>> descriptor =
            new ValueStateDescriptor<>(
                    "sum",
                    TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
                    }));


    CompletableFuture<ValueState<Tuple2<String, Long>>> resultFuture =
            client.getKvState(jobId, "query-name", "sum", BasicTypeInfo.STRING_TYPE_INFO, descriptor);
    System.out.println(resultFuture);
    resultFuture.thenAccept(response -> {
        try {
            Tuple2<String, Long> res = response.value();
            System.out.println("Queried sum value: " + res);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Exiting future ...");
    });
}
0

There are 0 answers