NumberFormatException when trying to start JavaStreamingContext

1.4k views Asked by At

I am trying to access Twitter API from my Spark program written in Java. I am getting the below error whenever I try to run my code.

Caused by: java.lang.NumberFormatException: Not a version:

My code:

import java.util.Arrays;
import java.util.Properties;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.twitter.*;

import twitter4j.Status;

public class JavaStreamingCount {

    public static void main(String[] args) throws Exception {

        SparkConf sc = new SparkConf()
        .setAppName("JavaNetworkWordCount")
        .setMaster("local[2]")
        .setJars(new String[]{System.getProperty("user.dir")+"/target/spark-basics-0.0.1-SNAPSHOT-jar-with-dependencies.jar"});

        JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

        Properties props = new Properties();
        props.setProperty("oauth.consumerKey","<my-key>");
        props.setProperty("oauth.consumerSecret","<my-key>");
        props.setProperty("oauth.accessToken","<my-key>");
        props.setProperty("oauth.accessTokenSecret","<my-key>");

        System.setProperties(props);

        JavaReceiverInputDStream<Status> stream =  TwitterUtils.createStream(ssc);

        JavaDStream<String> lines = stream.map(new Function<Status, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public String call(Status v1) throws Exception {
                return v1.getText();
            }

        });

        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            private static final long serialVersionUID = 1L;

            @Override
              public Iterable<String> call(String x) {
                return Arrays.asList(x.split(" "));
              }
            });

        words.print();

        ssc.start();
        ssc.awaitTermination();


    }

}

Here's my full log

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/17 17:56:35 INFO SparkContext: Running Spark version 1.4.0
15/06/17 17:56:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/06/17 17:56:40 INFO SecurityManager: Changing view acls to: Ravitej.Somayajula
15/06/17 17:56:40 INFO SecurityManager: Changing modify acls to: Ravitej.Somayajula
15/06/17 17:56:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Ravitej.Somayajula); users with modify permissions: Set(Ravitej.Somayajula)
15/06/17 17:56:41 INFO Slf4jLogger: Slf4jLogger started
15/06/17 17:56:41 INFO Remoting: Starting remoting
15/06/17 17:56:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:49952]
15/06/17 17:56:41 INFO Utils: Successfully started service 'sparkDriver' on port 49952.
15/06/17 17:56:41 INFO SparkEnv: Registering MapOutputTracker
15/06/17 17:56:41 INFO SparkEnv: Registering BlockManagerMaster
15/06/17 17:56:41 INFO DiskBlockManager: Created local directory at C:\Users\ravitej.somayajula\AppData\Local\Temp\spark-55bd21ca-e955-4509-8780-d0d1385d36e9\blockmgr-d3a2b9f4-19c8-4b11-a4bb-13c83dba4a92
15/06/17 17:56:41 INFO MemoryStore: MemoryStore started with capacity 969.6 MB
15/06/17 17:56:41 INFO HttpFileServer: HTTP File server directory is C:\Users\ravitej.somayajula\AppData\Local\Temp\spark-55bd21ca-e955-4509-8780-d0d1385d36e9\httpd-061c66c1-bcef-4e21-8eaa-7d034405400b
15/06/17 17:56:41 INFO HttpServer: Starting HTTP Server
15/06/17 17:56:42 INFO Utils: Successfully started service 'HTTP file server' on port 49953.
15/06/17 17:56:42 INFO SparkEnv: Registering OutputCommitCoordinator
15/06/17 17:56:42 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/06/17 17:56:42 INFO SparkUI: Started SparkUI at http://10.25.170.67:4040
15/06/17 17:56:43 INFO SparkContext: Added JAR F:\SparkWorkspace\spark-basics-java/target/spark-basics-0.0.1-SNAPSHOT-jar-with-dependencies.jar at http://10.25.170.67:49953/jars/spark-basics-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1434544003003
15/06/17 17:56:43 INFO Executor: Starting executor ID driver on host localhost
15/06/17 17:56:44 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 49972.
15/06/17 17:56:44 INFO NettyBlockTransferService: Server created on 49972
15/06/17 17:56:44 INFO BlockManagerMaster: Trying to register BlockManager
15/06/17 17:56:44 INFO BlockManagerMasterEndpoint: Registering block manager localhost:49972 with 969.6 MB RAM, BlockManagerId(driver, localhost, 49972)
15/06/17 17:56:44 INFO BlockManagerMaster: Registered BlockManager
15/06/17 17:56:45 INFO ReceiverTracker: ReceiverTracker started
15/06/17 17:56:45 INFO ForEachDStream: metadataCleanupDelay = -1
15/06/17 17:56:45 INFO FlatMappedDStream: metadataCleanupDelay = -1
15/06/17 17:56:45 INFO MappedDStream: metadataCleanupDelay = -1
15/06/17 17:56:45 INFO TwitterInputDStream: metadataCleanupDelay = -1
15/06/17 17:56:45 INFO TwitterInputDStream: Slide time = 1000 ms
15/06/17 17:56:45 INFO TwitterInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/06/17 17:56:45 INFO TwitterInputDStream: Checkpoint interval = null
15/06/17 17:56:45 INFO TwitterInputDStream: Remember duration = 1000 ms
15/06/17 17:56:45 INFO TwitterInputDStream: Initialized and validated org.apache.spark.streaming.twitter.TwitterInputDStream@7b2e3031
15/06/17 17:56:45 INFO MappedDStream: Slide time = 1000 ms
15/06/17 17:56:45 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/06/17 17:56:45 INFO MappedDStream: Checkpoint interval = null
15/06/17 17:56:45 INFO MappedDStream: Remember duration = 1000 ms
15/06/17 17:56:45 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@155e2025
15/06/17 17:56:45 INFO FlatMappedDStream: Slide time = 1000 ms
15/06/17 17:56:45 INFO FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/06/17 17:56:45 INFO FlatMappedDStream: Checkpoint interval = null
15/06/17 17:56:45 INFO FlatMappedDStream: Remember duration = 1000 ms
15/06/17 17:56:45 INFO FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@56906818
15/06/17 17:56:45 INFO ForEachDStream: Slide time = 1000 ms
15/06/17 17:56:45 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/06/17 17:56:45 INFO ForEachDStream: Checkpoint interval = null
15/06/17 17:56:45 INFO ForEachDStream: Remember duration = 1000 ms
15/06/17 17:56:45 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@22a22c0e
Exception in thread "main" java.lang.ExceptionInInitializerError
    at scala.collection.parallel.ParIterableLike$class.$init$(ParIterableLike.scala:166)
    at scala.collection.parallel.mutable.ParArray.<init>(ParArray.scala:58)
    at scala.collection.parallel.mutable.ParArray$.wrapOrRebuild(ParArray.scala:702)
    at scala.collection.parallel.mutable.ParArray$.handoff(ParArray.scala:700)
    at scala.collection.mutable.ArrayBuffer.par(ArrayBuffer.scala:74)
    at org.apache.spark.streaming.DStreamGraph.start(DStreamGraph.scala:49)
    at org.apache.spark.streaming.scheduler.JobGenerator.startFirstTime(JobGenerator.scala:188)
    at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:94)
    at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
    at com.pearson.illuminati.java.spark.JavaStreamingCount.main(JavaStreamingCount.java:64)
Caused by: java.lang.NumberFormatException: Not a version: 
    at scala.util.PropertiesTrait$class.parts$1(Properties.scala:176)
    at scala.util.PropertiesTrait$class.isJavaAtLeast(Properties.scala:180)
    at scala.util.Properties$.isJavaAtLeast(Properties.scala:16)
    at scala.collection.parallel.package$.getTaskSupport(package.scala:45)
    at scala.collection.parallel.package$.<init>(package.scala:48)
    at scala.collection.parallel.package$.<clinit>(package.scala)
    ... 13 more
15/06/17 17:56:45 INFO ReceiverTracker: Starting 1 receivers
15/06/17 17:56:45 INFO SparkContext: Starting job: start at JavaStreamingCount.java:64
15/06/17 17:56:45 INFO DAGScheduler: Got job 0 (start at JavaStreamingCount.java:64) with 1 output partitions (allowLocal=false)
15/06/17 17:56:45 INFO DAGScheduler: Final stage: ResultStage 0(start at JavaStreamingCount.java:64)
15/06/17 17:56:45 INFO DAGScheduler: Parents of final stage: List()
15/06/17 17:56:45 INFO DAGScheduler: Missing parents: List()
15/06/17 17:56:45 INFO DAGScheduler: Submitting ResultStage 0 (ParallelCollectionRDD[0] at start at JavaStreamingCount.java:64), which has no missing parents
java.lang.NullPointerException
    at org.xerial.snappy.OSInfo.translateOSNameToFolderName(OSInfo.java:140)
    at org.xerial.snappy.OSInfo.getOSName(OSInfo.java:107)
    at org.xerial.snappy.OSInfo.getNativeLibFolderPathForCurrentOS(OSInfo.java:103)
    at org.xerial.snappy.SnappyLoader.findNativeLibrary(SnappyLoader.java:284)
    at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:163)
    at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:145)
    at org.xerial.snappy.Snappy.<clinit>(Snappy.java:47)
    at org.apache.spark.io.SnappyCompressionCodec.<init>(CompressionCodec.scala:150)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
    at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
    at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:80)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:874)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:815)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/06/17 17:56:45 INFO TaskSchedulerImpl: Cancelling stage 0
15/06/17 17:56:45 INFO DAGScheduler: ResultStage 0 (start at JavaStreamingCount.java:64) failed in Unknown s
15/06/17 17:56:45 INFO DAGScheduler: Job 0 failed: start at JavaStreamingCount.java:64, took 0.131996 s
Exception in thread "Thread-27" org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.reflect.InvocationTargetException
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:526)
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:80)
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:874)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:815)
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:799)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:884)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:815)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/06/17 17:56:45 INFO SparkContext: Invoking stop() from shutdown hook
15/06/17 17:56:45 INFO SparkUI: Stopped Spark web UI at http://10.25.170.67:4040
15/06/17 17:56:45 INFO DAGScheduler: Stopping DAGScheduler
15/06/17 17:56:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/06/17 17:56:45 INFO Utils: path = C:\Users\ravitej.somayajula\AppData\Local\Temp\spark-55bd21ca-e955-4509-8780-d0d1385d36e9\blockmgr-d3a2b9f4-19c8-4b11-a4bb-13c83dba4a92, already present as root for deletion.
15/06/17 17:56:45 INFO MemoryStore: MemoryStore cleared
15/06/17 17:56:45 INFO BlockManager: BlockManager stopped
15/06/17 17:56:45 INFO BlockManagerMaster: BlockManagerMaster stopped
15/06/17 17:56:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
15/06/17 17:56:45 INFO SparkContext: Successfully stopped SparkContext
15/06/17 17:56:45 INFO Utils: Shutdown hook called
15/06/17 17:56:45 INFO Utils: Deleting directory C:\Users\ravitej.somayajula\AppData\Local\Temp\spark-55bd21ca-e955-4509-8780-d0d1385d36e9
15/06/17 17:56:45 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.

My Project's pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>spark-illuminati</groupId>
    <artifactId>spark-basics</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-twitter_2.10</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.twitter4j</groupId>
            <artifactId>twitter4j-core</artifactId>
            <version>[4.0,)</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>

            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                    <compilerVersion>1.7</compilerVersion>
                </configuration>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <excludes />
            </resource>
        </resources>
    </build>
</project>
1

There are 1 answers

0
Ravitej Somayajula On BEST ANSWER

There seems to be a version conflict between spark, spark-twitter and scala versions which I was able to resolve successfully