Input file:

___DATE___

2018-11-16T06:3937
Linux hortonworks 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
 06:39:37 up 100 days,  1:04, 2 users,  load average: 9.01, 8.30, 8.48
06:30:01 AM     all      6.08      0.00      2.83      0.04      0.00     91.06

___DATE___

2018-11-16T06:4037
Linux cloudera 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
 06:40:37 up 100 days,  1:05, 28 users,  load average: 8.39, 8.26, 8.45
06:40:01 AM     all      6.92      1.11      1.88      0.04      0.00     90.05

Required output:

2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users

I'm trying to get my hands on Spark with Scala. Trying to parse this input file with Spark 2.3.1 and scala 2.11.6 . Here's my code.

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SparkConf, SparkContext}

object parse_stats extends App {

  case class LoadSchema(date:String)

  val conf = new SparkConf().setAppName("ParseStats").setMaster("local[*]")
  val sc = new SparkContext(conf)

  val hadoopConf = new Configuration(sc.hadoopConfiguration)
  hadoopConf.set("textinputformat.record.delimiter","___DATE___")

  val input = sc.newAPIHadoopFile("C:\\Users\\rohit\\Documents\\dataset\\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)

  lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
  lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
  lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r

  val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
    (if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\\s+")(1).trim() ), //collects hostname
    (if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
  )
  }

  transformRDD.collect().foreach(println)
}

If run this code from Intellij, I get below output.

((),(),())
((),(),())
((),(),())

If I run from spark-shell, I get below error:

scala> import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configuration

scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text

scala> import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.LongWritable

scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}

scala>   val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml

scala>   hadoopConf.set("textinputformat.record.delimiter","___DATE___")

scala>   val input = sc.newAPIHadoopFile("C:\\Users\\rnimmal1\\Documents\\dataset\\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
input: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:37

scala>

scala>   lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
date_pattern: scala.util.matching.Regex = <lazy>

scala>   lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
uname_pattern: scala.util.matching.Regex = <lazy>

scala>   lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
cpu_regex: scala.util.matching.Regex = <lazy>

scala>

scala>   val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
     |     (if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\\s+")(1).trim() ), //collects hostname
     |     (if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
     |   )
     |   }
org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.map(RDD.scala:370)
  ... 54 elided
Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
Serialization stack:
        - object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml)
        - field (class: $iw, name: hadoopConf, type: class org.apache.hadoop.conf.Configuration)
        - object (class $iw, $iw@63fa0b9)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@3f4b52fa)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@338f9bb5)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@3d63becf)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@3aca7082)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@4ccfd904)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@6e4e7a62)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@5aaab2b0)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@5c51a7eb)
        - field (class: $line36.$read, name: $iw, type: class $iw)
        - object (class $line36.$read, $line36.$read@2ba3b4a6)
        - field (class: $iw, name: $line36$read, type: class $line36.$read)
        - object (class $iw, $iw@6559f04e)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@8f7cbcc)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@465b16bb)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@373efaa2)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@5f2896fa)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@f777d41)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@43ec41d7)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@61c0a61)
        - field (class: $line38.$read, name: $iw, type: class $iw)
        - object (class $line38.$read, $line38.$read@10d1f6da)
        - field (class: $iw, name: $line38$read, type: class $line38.$read)
        - object (class $iw, $iw@2095e085)
        - field (class: $iw, name: $outer, type: class $iw)
        - object (class $iw, $iw@380cb7e3)
        - field (class: $anonfun$1, name: $outer, type: class $iw)
        - object (class $anonfun$1, <function1>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
  ... 63 more

What am I missing?

2

There are 2 answers

4
stack0114106 On BEST ANSWER

After changing the __DATA__ to pipe "|" , the below code snippet produces the required output. Note that I have used Windows platform, so I'm replacing "\r". Please check out

val spark = SparkSession.builder().appName("Spark_test").master("local[*]").getOrCreate()

import spark.implicits._

val file1 = spark.sparkContext.textFile("./in/machine_logs.txt")

spark.sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter","|")

val file2 = file1.filter( line => { val x = line.split("""\n"""); x.length > 5 } )
                    .map( line => { val x = line.split("""\n""")
                      val p = x(2).replaceAll("\\r","") // not needed if Unix platform
                      val q = x(3).split(" ")(1)
                      val r = x(4).split(",")(2)
                      (p + "," + q + "," + r)
                    } )

file2.collect.foreach(println)
//file2.saveAsTextFile("./in/machine_logs.out") --> comment above line and uncomment this line to save in file

Output:

2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users

Update1:

With regex matching:

val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
val uname_pattern="(Linux) (.*?) [0-9a-zA-z-#() . : _ /]+(GNU/Linux)".r
val cpu_regex="""(.+),(.*?),\s+(load average)[:][0-9 . ,]+""".r
val file2 = file1.filter( line => { val x = line.split("""\n"""); x.length > 5 } )
  .map( line => {
          var q = ""; var r = "";
          val p = date_pattern.findFirstIn(line).mkString
          uname_pattern.findAllIn(line).matchData.foreach(m=> {q = m.group(2).mkString} )
          cpu_regex.findAllIn(line).matchData.foreach(m=> {r = m.group(2).mkString} )
          (p + "," + q + "," + r)
  } )
file2.collect.foreach(println)
0
MBillau On

I believe the problem is that you are defining those filters objects (date_pattern) outside of the RDD, so Spark has to send the entire parse_stats object to all of the executors, which it cannot do because it cannot serialize that entire object. This doesn't happen when you run it in local mode because it doesn't need to send any objects to other executors.

Check out the great answer here: Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

This gist has some quick and simple ways to avoid serialization: https://gist.github.com/kmader/1d64e64621e63d566f67