How to create a executable jar reading files from local file system

659 views Asked by At

I've been struggling myself in order to make a executable jar from a spark/scala code.

I have to create a web interface that shows the result of a spark/scala code.

I intend to use JavaScript to call this jar and get his return. So first, I created a simple code#1 that works (which will be extended). Running on sbt shell it workS properly.

Here's the code#1:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}

package com.parser{

object Main {

    def g(x: String, y:String = "2010") : org.apache.spark.sql.DataFrame = {
        val conf = new SparkConf().setAppName("BigData").setMaster("local[*]").set("spark.executor.memory","2g");//.setMaster("spark://myhost:7077")
        conf.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName);
        conf.set("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName);
        val csv = sc.textFile("file://~/BigData/CSV/Casos_Notificados_Dengue_"+x+"_"+y+".csv")
        val rdd = csv.mapPartitionsWithIndex(
            ((i,iterator) => if (i == 0 && iterator.hasNext){
                iterator.next
                iterator.next
                iterator
            }else iterator), true)
        var schemaArray = csv.collect()(1).split(",")
        schemaArray(0) = "NU_NOTIF" //Corrigindo mudança de header de 2011 para 2012
        val schema =
            StructType(
                schemaArray.map(fieldName => 
                if(fieldName == "NU_NOTIF") StructField(fieldName, StringType, false)
                else StructField(fieldName, StringType, true)))
        val rowRDD = rdd.map(_.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)")).map(p => Row.fromSeq(p))
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        val schemaRDD = sqlContext.applySchema(rowRDD, schema)
        schemaRDD.registerTempTable("casos")
        val r = sqlContext.sql("SELECT NU_NOTIF,NU_ANO,Long_WGS84,Lat_WGS84 FROM casos")
        return r
    }

    def main(Args: Array[String]) {
        val tables = g("01","2010")
        tables.foreach(println)
    }
}

}

This is my build.sbt:

name := "BigData"

version := "1.0"

scalaVersion := "2.11.2"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "1.3.0", "org.apache.spark" %% "spark-sql"  % "1.3.0", "org.apache.hadoop" % "hadoop-common" % "2.4.0")

lazy val root = (project in file(".")).
  settings(
    name := "bigdata",
    version := "1.0",
    scalaVersion := "2.11.2",
    mainClass in Compile := Some("com.parser.Main")  
  )

val meta = """META.INF(.)*""".r
assemblyMergeStrategy in assembly := {
  case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".html" =>     MergeStrategy.first
  case n if n.startsWith("reference.conf") => MergeStrategy.concat
  case n if n.endsWith(".conf") => MergeStrategy.concat
  case meta(_) => MergeStrategy.discard
  case x => MergeStrategy.first
}

Now, I'm running sbt assembly and it creates the jar. But when I java -jar it the output is:

 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
 15/06/09 11:15:00 INFO SparkContext: Running Spark version 1.3.0
 15/06/09 11:15:00 WARN Utils: Your hostname, MiddleEarth resolves to a loopback address: 127.0.1.1; using 192.168.43.56 instead (on interface wlan0)
 15/06/09 11:15:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
 15/06/09 11:15:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 15/06/09 11:15:01 INFO SecurityManager: Changing view acls to: sathler
 15/06/09 11:15:01 INFO SecurityManager: Changing modify acls to: sathler
 15/06/09 11:15:01 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(sathler); users with modify permissions: Set(sathler)
 15/06/09 11:15:01 INFO Slf4jLogger: Slf4jLogger started
 15/06/09 11:15:01 INFO Remoting: Starting remoting
 15/06/09 11:15:01 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:51456]
 15/06/09 11:15:01 INFO Utils: Successfully started service 'sparkDriver' on port 51456.
 15/06/09 11:15:01 INFO SparkEnv: Registering MapOutputTracker
 15/06/09 11:15:01 INFO SparkEnv: Registering BlockManagerMaster
 15/06/09 11:15:01 INFO DiskBlockManager: Created local directory at /tmp/spark-2b0531d5-1c1f-42a8-b7c5-32359eff8c17/blockmgr-9b029fcb-2bb3-43fa-bd0e-82b14ba192ef
 15/06/09 11:15:01 INFO MemoryStore: MemoryStore started with capacity 947.7 MB
 15/06/09 11:15:01 INFO HttpFileServer: HTTP File server directory is /tmp/spark-20cccbf0-298a-435d-821a-62f7178d6944/httpd-31dffd5c-1c5f-4c3f-ad58-7bc5d5ce8ad8
 15/06/09 11:15:01 INFO HttpServer: Starting HTTP Server
 15/06/09 11:15:01 INFO Server: jetty-8.y.z-SNAPSHOT
 15/06/09 11:15:01 INFO AbstractConnector: Started [email protected]:52096
 15/06/09 11:15:01 INFO Utils: Successfully started service 'HTTP file server' on port 52096.
 15/06/09 11:15:01 INFO SparkEnv: Registering OutputCommitCoordinator
 15/06/09 11:15:01 INFO Server: jetty-8.y.z-SNAPSHOT
 15/06/09 11:15:01 INFO AbstractConnector: Started [email protected]:4040
 15/06/09 11:15:01 INFO Utils: Successfully started service 'SparkUI' on port 4040.
 15/06/09 11:15:01 INFO SparkUI: Started SparkUI at http://192.168.43.56:4040
 15/06/09 11:15:02 INFO Executor: Starting executor ID <driver> on host localhost
 15/06/09 11:15:02 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://[email protected]:51456/user/HeartbeatReceiver
 15/06/09 11:15:02 INFO NettyBlockTransferService: Server created on 53107
 15/06/09 11:15:02 INFO BlockManagerMaster: Trying to register BlockManager
 15/06/09 11:15:02 INFO BlockManagerMasterActor: Registering block manager localhost:53107 with 947.7 MB RAM, BlockManagerId(<driver>, localhost, 53107)
 15/06/09 11:15:02 INFO BlockManagerMaster: Registered BlockManager
 15/06/09 11:15:02 INFO SequenceFileRDDFunctions: Saving as sequence      file of type (NullWritable,BytesWritable)
 Exception in thread "main" java.io.IOException: No FileSystem for scheme: file
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2385)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:167)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:352)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.spark.SparkHadoopWriter$.createPathFromString(SparkHadoopWriter.scala:218)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:953)
at org.apache.spark.rdd.SequenceFileRDDFunctions.saveAsSequenceFile(SequenceFileRDDFunctions.scala:103)
at org.apache.spark.rdd.RDD.saveAsObjectFile(RDD.scala:1317)
at com.parser.Main$.main(Main.scala:17)
at com.parser.Main.main(Main.scala) 

So, I've tried many solutions as seen on internet, but none seem to work. I didn't know if the problem was because the program couldn't find the csv file or with the context. Then, I've created this code#2, which is more simple:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}


package com.parser{

    object Main {

        def main(Args: Array[String]) {

            val conf = new SparkConf().setAppName("BigData").setMaster("local[*]").set("spark.executor.memory","2g");//.setMaster("spark://myhost:7077")
            val sc = new SparkContext(conf)         

            val rdd = sc.emptyRDD[Int]
            rdd.saveAsObjectFile("test.txt")

        }
    }
}

But it gives the same error. So, I think that the problem is with the context. Does anyone know how to solve it? (Saving the file from code#2 and read the file of code#1)

PS: It will work on a single computer for now.

0

There are 0 answers