I am querying to proxy server of flink cluster which is on 127.0.1.1:9069 but not getting response for query. I am calculating sum of all inputted numbers by creating a server on 9000 port. Also I am storing the sum in Value State.
Flink Job:
private transient ValueState<Tuple2<String, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<String,Long>> out) throws Exception {
if (input.f1==-1){
sum.clear();
return;
}
Tuple2<String, Long> currentSum = sum.value();
currentSum.f1 += input.f1;
sum.update(currentSum);
System.out.println("Current Sum: "+(sum.value().f1)+"\nCurrent Count: "+(sum.value().f0));
out.collect(new Tuple2<>("sum", sum.value().f1));
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<String, Long>> descriptor =
new ValueStateDescriptor<>(
"sum", // the state name
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}),
Tuple2.of("sum", 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
inp.flatMap(new FlatMapFunction<String, Tuple2<Long, Long>>() {
@Override
public void flatMap(String inpstr, Collector<Tuple2<Long, Long>> out) throws Exception{
for (String word : inpstr.split("\\s")) {
try {
if(word.equals("quit")){
throw new QuitValueState( "Stoppping!!!",hostname,port);
}
if(word.equals("clear")){
word="-1";
}
out.collect(Tuple2.of(1L, Long.valueOf(word)));
}
catch ( NumberFormatException e) {
System.out.println("Enter valid number: "+e.getMessage());
}catch (QuitValueState ex){
System.out.println("Quitting!!!");
}
}
}
}).keyBy(0).flatMap(new StreamingJob())
.keyBy(0).asQueryableState("query-name");
On flink cluster I am able to see proxy server at 127.0.1.1:9069
Client side:
public static void main(String[] args) throws IOException, InterruptedException, Exception {
QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069);
System.out.println("Querying on "+args[0]);
JobID jobId = JobID.fromHexString(args[0]);
ValueStateDescriptor<Tuple2<String, Long>> descriptor =
new ValueStateDescriptor<>(
"sum",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
}));
CompletableFuture<ValueState<Tuple2<String, Long>>> resultFuture =
client.getKvState(jobId, "query-name", "sum", BasicTypeInfo.STRING_TYPE_INFO, descriptor);
System.out.println(resultFuture);
resultFuture.thenAccept(response -> {
try {
Tuple2<String, Long> res = response.value();
System.out.println("Queried sum value: " + res);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Exiting future ...");
});
}