i have an error when I try to compile, test and run a junit test.
I want to load a local Avro file using DataFrames but I am getting an exception:
org.xerial.snappy.SnappyError: [FAILED_TO_LOAD_NATIVE_LIBRARY] null
I am not using Cassandra at all, the version of involved jars are:
<properties>
<!-- Generic properties -->
<java.version>1.7</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- Dependency versions -->
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<scala.version>2.10.4</scala.version>
<junit.version>4.11</junit.version>
<slf4j.version>1.7.12</slf4j.version>
<spark.version>1.5.0-cdh5.5.2</spark.version>
<databricks.version>1.5.0</databricks.version>
<json4s-native.version>3.5.0</json4s-native.version>
<spark-avro.version>2.0.1</spark-avro.version>
</properties>
and these are the dependencies:
<dependencies>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-native_2.10</artifactId>
<version>${json4s-native.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>${databricks.version}</version>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.0.4.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.10</artifactId>
<version>${spark-avro.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
I have tried to compile the project with
mvn clean install -Dorg.xerial.snappy.lib.name=libsnappyjava.jnlib -Dorg.xerial.snappy.tempdir=/tmp
before copying the jar within /tmp, with no luck.
$ ls -lt /tmp/
total 1944
...27 dic 13:01 snappy-java-1.0.4.jar
This is the code:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}
import com.databricks.spark.avro._
import java.io._
//auxiliary function
def readRawData(pathToResources: String, sqlContext: SQLContext, rawFormat: String = "json"): DataFrame = {
val a: DataFrame = rawFormat match {
case "avro" => sqlContext.read.avro(pathToResources)
case "json" => sqlContext.read.json(pathToResources)
case _ => throw new Exception("Format not supported, use AVRO or JSON instead.")
}
val b: DataFrame = a.filter("extraData.type = 'data'")
val c: DataFrame = a.select("extraData.topic", "extraData.timestamp",
"extraData.sha1Hex", "extraData.filePath", "extraData.fileName",
"extraData.lineNumber", "extraData.type",
"message")
val indexForMessage: Int = c.schema.fieldIndex("message")
val result: RDD[Row] = c.rdd.filter(r =>
!r.anyNull match {
case true => true
case false => false
}
).flatMap(r => {
val metadata: String = r.toSeq.slice(0, indexForMessage).mkString(",")
val lines = r.getString(indexForMessage).split("\n")
lines.map(l => Row.fromSeq(metadata.split(",").toSeq ++ Seq(l)))
})
sqlContext.createDataFrame(result, c.schema)
}//readRawData
def validate(rawFlumeData : String = "FlumeData.1482407196579",fileNamesToBeDigested : String = "fileNames-to-be-digested.txt", sqlContext: SQLContext,sc:SparkContext) : Boolean = {
val result : Boolean = true
sqlContext.sparkContext.hadoopConfiguration.set("avro.mapred.ignore.inputs.without.extension", "false")
val rawDF : DataFrame = readRawData(rawFlumeData, sqlContext, rawFormat = "avro")
rawDF.registerTempTable("RAW")
//this line provokes the exception! cannot load snappy jar file!
val arrayRows : Array[org.apache.spark.sql.Row] = sqlContext.sql("SELECT distinct fileName as filenames FROM RAW GROUP BY fileName").collect()
val arrayFileNames : Array[String] = arrayRows.map(row=>row.getString(0))
val fileNamesDigested = "fileNames-AVRO-1482407196579.txt"
val pw = new PrintWriter(new File(fileNamesDigested))
for (filename <-arrayFileNames) pw.write(filename + "\n")
pw.close
val searchListToBeDigested : org.apache.spark.rdd.RDD[String] = sc.textFile(fileNamesToBeDigested)
//creo un map con valores como éstos: Map(EUR_BACK_SWVOL_SMILE_GBP_20160930.csv -> 0, UK_SC_equities_20160930.csv -> 14,...
//val mapFileNamesToBeDigested: Map[String, Long] = searchListToBeDigested.zipWithUniqueId().collect().toMap
val searchFilesAVRODigested = sc.textFile(fileNamesDigested)
val mapFileNamesAVRODigested: Map[String, Long] = searchFilesAVRODigested.zipWithUniqueId().collect().toMap
val pwResults = new PrintWriter(new File("validation-results.txt"))
//Hay que guardar el resultado en un fichero de texto, en algún lado...
val buffer = StringBuilder.newBuilder
//Me traigo los resultados al Driver.
val listFilesToBeDigested = searchListToBeDigested.map {line =>
val resultTemp = mapFileNamesAVRODigested.getOrElse(line,"NOT INGESTED!")
var resul = ""
if (resultTemp == "NOT INGESTED!"){
resul = "File " + line + " " + resultTemp + "\n"
}
else{
resul = "File " + line + " " + " is INGESTED!" + "\n"
}
resul
}.collect()
//añado los datos al buffer
listFilesToBeDigested.foreach(buffer.append(_))
//guardo el contenido del buffer en el fichero de texto de salida.
pwResults.write(buffer.toString)
pwResults.close
//this boolean must return false in case of a exception or error...
result
}//
This is the unit test code:
private[validation] class ValidateInputCSVFilesTest {
//AS YOU CAN SEE, I do not WANT to use snappy at all!
val conf = new SparkConf()
.setAppName("ValidateInputCSVFilesTest")
.setMaster("local[2]")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.driver.host", "127.0.0.1")
.set("spark.io.compression.codec", "lzf")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val properties : Properties = new Properties()
properties.setProperty("frtb.input.csv.validation.avro","./src/test/resources/avro/FlumeData.1482407196579")
properties.setProperty("frtb.input.csv.validation.list.files","./src/test/resources/fileNames-to-be-digested.txt")
import sqlContext.implicits._
sqlContext.sparkContext.hadoopConfiguration.set("avro.mapred.ignore.inputs.without.extension", "false")
@Test
def testValidateInputFiles() = {
//def validate(rawFlumeData : String = "FlumeData.1482407196579",fileNamesToBeDigested : String = "fileNames-to-be-digested.txt", sqlContext: SQLContext)
val rawFlumeData = properties.getProperty("frtb.input.csv.validation.avro")
val fileNamesToBeDigested = properties.getProperty("frtb.input.csv.validation.list.files")
println("rawFlumeData is " + rawFlumeData )
println("fileNamesToBeDigested is " + fileNamesToBeDigested )
val result : Boolean = ValidateInputCSVFiles.validate(rawFlumeData ,fileNamesToBeDigested ,sqlContext,sc)
Assert.assertTrue("Must be true...",result)
}//end of test method
}//end of unit class
I can run perfectly the same code in a local spark-shell, using this command:
$ bin/spark-shell --packages org.json4s:json4s-native_2.10:3.5.0 --packages com.databricks:spark-csv_2.10:1.5.0 --packages com.databricks:spark-avro_2.10:2.0.1
What else can I do?
Thanks in advance.
The problem was solved when I changed the scope of spark dependencies.
This is part of the pom.xml that solves my problem, now I can run the job with spark-submit command...
...
...