I am learning Apache Spark streaming and tried to generate JavaPairInputDStream
from JavaStreamingContext
. Below is my code:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
.......
.......
SparkConf sc = new SparkConf().setAppName("SparkStreamTest").setMaster("local[*]");;
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(3));
List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
data1.add(new Tuple2<String, String>("K1", "ABC"));
data1.add(new Tuple2<String, String>("K2", "DE"));
data1.add(new Tuple2<String, String>("K1", "F"));
data1.add(new Tuple2<String, String>("K3", "GHI"));
JavaPairRDD<String, String> pairs1 = jssc.sparkContext().parallelizePairs(data1);
List<Tuple2<String, Integer>> data2 = new ArrayList<Tuple2<String, Integer>>();
data2.add(new Tuple2<String, Integer>("K1", 123));
data2.add(new Tuple2<String, Integer>("K2", 456));
data2.add(new Tuple2<String, Integer>("K7", 0));
JavaPairRDD<String, String> pairs2 = jssc.sparkContext().parallelizePairs(data1);
Queue<JavaPairRDD<String, String>> inputQueue = new LinkedList<>(Arrays.asList(pairs1, pairs2));
JavaPairInputDStream<String, String> lines = jssc.queueStream(inputQueue, true);
But the last line of my application throws this exception:
The method
queueStream(Queue<JavaRDD<T>>, boolean)
in the typeJavaStreamingContext
is not applicable for the arguments (Queue<JavaPairRDD<String,String>>
,boolean
)
I have no idea how to generate JavaPairInputDStream with JavaStreamingContext.
If you check the API for
queueStream
method ofJavaStreamingContext
class, it acceptsjava.util.Queue<JavaRDD<T>>
as queue parameter. I modified your program to getQueue<JavaRDD<T>
queue. ThequeueStream
method returnsJavaInputDStream<T>
type and here is how you can transform it toJavaPairDStream<String,String>
.JavaPairDStream
class is the super class ofJavaPairInputDStream
class. Hope this helps.