I am trying to access Twitter API from my Spark program written in Java. I am getting the below error whenever I try to run my code.
Caused by: java.lang.NumberFormatException: Not a version:
My code:
import java.util.Arrays;
import java.util.Properties;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.twitter.*;
import twitter4j.Status;
public class JavaStreamingCount {
public static void main(String[] args) throws Exception {
SparkConf sc = new SparkConf()
.setAppName("JavaNetworkWordCount")
.setMaster("local[2]")
.setJars(new String[]{System.getProperty("user.dir")+"/target/spark-basics-0.0.1-SNAPSHOT-jar-with-dependencies.jar"});
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
Properties props = new Properties();
props.setProperty("oauth.consumerKey","<my-key>");
props.setProperty("oauth.consumerSecret","<my-key>");
props.setProperty("oauth.accessToken","<my-key>");
props.setProperty("oauth.accessTokenSecret","<my-key>");
System.setProperties(props);
JavaReceiverInputDStream<Status> stream = TwitterUtils.createStream(ssc);
JavaDStream<String> lines = stream.map(new Function<Status, String>() {
private static final long serialVersionUID = 1L;
@Override
public String call(Status v1) throws Exception {
return v1.getText();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}
});
words.print();
ssc.start();
ssc.awaitTermination();
}
}
Here's my full log
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/17 17:56:35 INFO SparkContext: Running Spark version 1.4.0
15/06/17 17:56:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/06/17 17:56:40 INFO SecurityManager: Changing view acls to: Ravitej.Somayajula
15/06/17 17:56:40 INFO SecurityManager: Changing modify acls to: Ravitej.Somayajula
15/06/17 17:56:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Ravitej.Somayajula); users with modify permissions: Set(Ravitej.Somayajula)
15/06/17 17:56:41 INFO Slf4jLogger: Slf4jLogger started
15/06/17 17:56:41 INFO Remoting: Starting remoting
15/06/17 17:56:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:49952]
15/06/17 17:56:41 INFO Utils: Successfully started service 'sparkDriver' on port 49952.
15/06/17 17:56:41 INFO SparkEnv: Registering MapOutputTracker
15/06/17 17:56:41 INFO SparkEnv: Registering BlockManagerMaster
15/06/17 17:56:41 INFO DiskBlockManager: Created local directory at C:\Users\ravitej.somayajula\AppData\Local\Temp\spark-55bd21ca-e955-4509-8780-d0d1385d36e9\blockmgr-d3a2b9f4-19c8-4b11-a4bb-13c83dba4a92
15/06/17 17:56:41 INFO MemoryStore: MemoryStore started with capacity 969.6 MB
15/06/17 17:56:41 INFO HttpFileServer: HTTP File server directory is C:\Users\ravitej.somayajula\AppData\Local\Temp\spark-55bd21ca-e955-4509-8780-d0d1385d36e9\httpd-061c66c1-bcef-4e21-8eaa-7d034405400b
15/06/17 17:56:41 INFO HttpServer: Starting HTTP Server
15/06/17 17:56:42 INFO Utils: Successfully started service 'HTTP file server' on port 49953.
15/06/17 17:56:42 INFO SparkEnv: Registering OutputCommitCoordinator
15/06/17 17:56:42 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/06/17 17:56:42 INFO SparkUI: Started SparkUI at http://10.25.170.67:4040
15/06/17 17:56:43 INFO SparkContext: Added JAR F:\SparkWorkspace\spark-basics-java/target/spark-basics-0.0.1-SNAPSHOT-jar-with-dependencies.jar at http://10.25.170.67:49953/jars/spark-basics-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1434544003003
15/06/17 17:56:43 INFO Executor: Starting executor ID driver on host localhost
15/06/17 17:56:44 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 49972.
15/06/17 17:56:44 INFO NettyBlockTransferService: Server created on 49972
15/06/17 17:56:44 INFO BlockManagerMaster: Trying to register BlockManager
15/06/17 17:56:44 INFO BlockManagerMasterEndpoint: Registering block manager localhost:49972 with 969.6 MB RAM, BlockManagerId(driver, localhost, 49972)
15/06/17 17:56:44 INFO BlockManagerMaster: Registered BlockManager
15/06/17 17:56:45 INFO ReceiverTracker: ReceiverTracker started
15/06/17 17:56:45 INFO ForEachDStream: metadataCleanupDelay = -1
15/06/17 17:56:45 INFO FlatMappedDStream: metadataCleanupDelay = -1
15/06/17 17:56:45 INFO MappedDStream: metadataCleanupDelay = -1
15/06/17 17:56:45 INFO TwitterInputDStream: metadataCleanupDelay = -1
15/06/17 17:56:45 INFO TwitterInputDStream: Slide time = 1000 ms
15/06/17 17:56:45 INFO TwitterInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/06/17 17:56:45 INFO TwitterInputDStream: Checkpoint interval = null
15/06/17 17:56:45 INFO TwitterInputDStream: Remember duration = 1000 ms
15/06/17 17:56:45 INFO TwitterInputDStream: Initialized and validated org.apache.spark.streaming.twitter.TwitterInputDStream@7b2e3031
15/06/17 17:56:45 INFO MappedDStream: Slide time = 1000 ms
15/06/17 17:56:45 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/06/17 17:56:45 INFO MappedDStream: Checkpoint interval = null
15/06/17 17:56:45 INFO MappedDStream: Remember duration = 1000 ms
15/06/17 17:56:45 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@155e2025
15/06/17 17:56:45 INFO FlatMappedDStream: Slide time = 1000 ms
15/06/17 17:56:45 INFO FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/06/17 17:56:45 INFO FlatMappedDStream: Checkpoint interval = null
15/06/17 17:56:45 INFO FlatMappedDStream: Remember duration = 1000 ms
15/06/17 17:56:45 INFO FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@56906818
15/06/17 17:56:45 INFO ForEachDStream: Slide time = 1000 ms
15/06/17 17:56:45 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/06/17 17:56:45 INFO ForEachDStream: Checkpoint interval = null
15/06/17 17:56:45 INFO ForEachDStream: Remember duration = 1000 ms
15/06/17 17:56:45 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@22a22c0e
Exception in thread "main" java.lang.ExceptionInInitializerError
at scala.collection.parallel.ParIterableLike$class.$init$(ParIterableLike.scala:166)
at scala.collection.parallel.mutable.ParArray.<init>(ParArray.scala:58)
at scala.collection.parallel.mutable.ParArray$.wrapOrRebuild(ParArray.scala:702)
at scala.collection.parallel.mutable.ParArray$.handoff(ParArray.scala:700)
at scala.collection.mutable.ArrayBuffer.par(ArrayBuffer.scala:74)
at org.apache.spark.streaming.DStreamGraph.start(DStreamGraph.scala:49)
at org.apache.spark.streaming.scheduler.JobGenerator.startFirstTime(JobGenerator.scala:188)
at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:94)
at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
at com.pearson.illuminati.java.spark.JavaStreamingCount.main(JavaStreamingCount.java:64)
Caused by: java.lang.NumberFormatException: Not a version:
at scala.util.PropertiesTrait$class.parts$1(Properties.scala:176)
at scala.util.PropertiesTrait$class.isJavaAtLeast(Properties.scala:180)
at scala.util.Properties$.isJavaAtLeast(Properties.scala:16)
at scala.collection.parallel.package$.getTaskSupport(package.scala:45)
at scala.collection.parallel.package$.<init>(package.scala:48)
at scala.collection.parallel.package$.<clinit>(package.scala)
... 13 more
15/06/17 17:56:45 INFO ReceiverTracker: Starting 1 receivers
15/06/17 17:56:45 INFO SparkContext: Starting job: start at JavaStreamingCount.java:64
15/06/17 17:56:45 INFO DAGScheduler: Got job 0 (start at JavaStreamingCount.java:64) with 1 output partitions (allowLocal=false)
15/06/17 17:56:45 INFO DAGScheduler: Final stage: ResultStage 0(start at JavaStreamingCount.java:64)
15/06/17 17:56:45 INFO DAGScheduler: Parents of final stage: List()
15/06/17 17:56:45 INFO DAGScheduler: Missing parents: List()
15/06/17 17:56:45 INFO DAGScheduler: Submitting ResultStage 0 (ParallelCollectionRDD[0] at start at JavaStreamingCount.java:64), which has no missing parents
java.lang.NullPointerException
at org.xerial.snappy.OSInfo.translateOSNameToFolderName(OSInfo.java:140)
at org.xerial.snappy.OSInfo.getOSName(OSInfo.java:107)
at org.xerial.snappy.OSInfo.getNativeLibFolderPathForCurrentOS(OSInfo.java:103)
at org.xerial.snappy.SnappyLoader.findNativeLibrary(SnappyLoader.java:284)
at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:163)
at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:145)
at org.xerial.snappy.Snappy.<clinit>(Snappy.java:47)
at org.apache.spark.io.SnappyCompressionCodec.<init>(CompressionCodec.scala:150)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:80)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:874)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:815)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/06/17 17:56:45 INFO TaskSchedulerImpl: Cancelling stage 0
15/06/17 17:56:45 INFO DAGScheduler: ResultStage 0 (start at JavaStreamingCount.java:64) failed in Unknown s
15/06/17 17:56:45 INFO DAGScheduler: Job 0 failed: start at JavaStreamingCount.java:64, took 0.131996 s
Exception in thread "Thread-27" org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.reflect.InvocationTargetException
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:526)
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:80)
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:874)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:815)
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:799)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:884)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:815)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/06/17 17:56:45 INFO SparkContext: Invoking stop() from shutdown hook
15/06/17 17:56:45 INFO SparkUI: Stopped Spark web UI at http://10.25.170.67:4040
15/06/17 17:56:45 INFO DAGScheduler: Stopping DAGScheduler
15/06/17 17:56:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/06/17 17:56:45 INFO Utils: path = C:\Users\ravitej.somayajula\AppData\Local\Temp\spark-55bd21ca-e955-4509-8780-d0d1385d36e9\blockmgr-d3a2b9f4-19c8-4b11-a4bb-13c83dba4a92, already present as root for deletion.
15/06/17 17:56:45 INFO MemoryStore: MemoryStore cleared
15/06/17 17:56:45 INFO BlockManager: BlockManager stopped
15/06/17 17:56:45 INFO BlockManagerMaster: BlockManagerMaster stopped
15/06/17 17:56:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
15/06/17 17:56:45 INFO SparkContext: Successfully stopped SparkContext
15/06/17 17:56:45 INFO Utils: Shutdown hook called
15/06/17 17:56:45 INFO Utils: Deleting directory C:\Users\ravitej.somayajula\AppData\Local\Temp\spark-55bd21ca-e955-4509-8780-d0d1385d36e9
15/06/17 17:56:45 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
My Project's pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>spark-illuminati</groupId>
<artifactId>spark-basics</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-twitter_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
<version>[4.0,)</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<compilerVersion>1.7</compilerVersion>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<excludes />
</resource>
</resources>
</build>
</project>
There seems to be a version conflict between spark, spark-twitter and scala versions which I was able to resolve successfully