I would like to use the results of the batch execution (which computes the classic word-count example) , which are saved in the "counts" variable, as a start for the streaming job which begins right after. Is this possible? For example if the batch has computed (Hello,2), (World, 1) and the first word from streaming is "Hello" I would like the first result to be (Hello,3), (World, 1). Thank you
public class UpdateWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String args[]){
// Create the context with a 1 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount").setMaster("local[2]");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
ArrayList<String> file=new ArrayList<String>(); file.add("Hello");file.add("Hello");file.add("World");
JavaRDD<String> linesB = ctx.parallelize(file);
JavaRDD<String> wordsB = linesB.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) {
return Arrays.asList(SPACE.split(s));
}
});
JavaPairRDD<String, Integer> ones = wordsB.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
final JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
ctx.stop();
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
ssc.checkpoint("checkpointWordCount");
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
"localhost", 9999, StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
return Optional.of(values.size()+state.or(0));
}
};
//I would like the wordCounts to already contain the results of previous batch contained in counts.
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).updateStateByKey(updateFunction);
wordCounts.print();
ssc.start();
ssc.awaitTermination();
}
}
how about you convert the results from wordcount JavaRDD into a Java Map(you could use countByValue()) and each time you encounter a word from streaming window just update/add to the map?