Is it possible to access the current iteration index when performing iterations in Apache Flink 1.3.2 and Scala 2.10?
val initialData: DataSet[(ItemSet[T], Int)]
initialData.iterate(maxIterations) {
current: DataSet[(ItemSet[T], Int)] =>
val currentIteration = ??? // Is this accessible somehow?
// ...
current
}
Based on my search so far, the short answer is "No" and the slightly longer answer is that it may be possible to hack around this using Flink's raw state.
Solution # 1: One way is to write a
system.out.println()inside this iterator and print values to console, or write those values to CSV as, but there will be some overhead in these cases.Solution # 2: Another way is to use
Counterinside a map function which will increment the values and then useJobExecutionResultto get this valueHere is how I can be done in Java
Let's say I am joining 2 streams and I want to count how many events are getting merged in total.
To do this I will write a
mapfunction which will be called for each merged event and perform an increment usingcountername.add(value_to_increment). Also, we have given it a namemerged, which can be used at the end to fetch the results.Now, this is how we will get the results. Here
JobExecutionResultwill hold the execution results which can be queried laterThis is how we get the # of merged events
You can also do
System.out.println();inside the map to see the values on the console.Please let me know if you have any further questions