How to add current_timestamp() column to a streaming dataframe?

4k views Asked by At

I'm using Spark 2.4.3 and Scala.

I'm fetching messages from a streaming kafka source of the following structure:

{"message": "Jan 7 17:53:48 PA-850.abc.com 1,2020/01/07 17:53:41,001801012404,TRAFFIC,drop,2304,2020/01/07 17:53:41,10.7.26.51,10.8.3.11,0.0.0.0,0.0.0.0,interzone-default,,,not-applicable,vsys1,SERVER-VLAN,VPN,ethernet1/6.18,,test-1,2020/01/07 17:53:41,0,1,45194,514,0,0,0x0,udp,deny,588,588,0,1,2020/01/07 17:53:45,0,any,0,35067255521,0x8000000000000000,10.0.0.0-10.255.255.255,10.0.0.0-10.255.255.255,0,1,0,policy-deny,0,0,0,0,,PA-850,from-policy,,,0,,0,,N/A,0,0,0,0,b804eab2-f240-467a-be97-6f8c382afd4c,0","source_host": "10.7.26.51"}

My goal is to add a new timestamp column to each row with the current timestamp in my streaming data. I have to insert all these rows into a cassandra table.

package devices

import configurations._
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, from_json, lower, split}
import org.apache.spark.sql.cassandra._
import scala.collection.mutable.{ListBuffer, Map}
import scala.io.Source
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType,TimestampType}
import org.apache.spark.sql.functions.to_timestamp
import org.apache.spark.sql.functions.unix_timestamp

object PA {

  def main(args: Array[String]): Unit = {

    val spark = SparkBuilder.spark

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", configHelper.kafka_host)
      .option("subscribe", configHelper.log_topic)
      .option("startingOffsets", "earliest")
      .option("multiLine", true)
      .option("includeTimestamp",true)
      .load()

    df.printSchema()  

    def getDeviceNameOSMapping():Map[String,String]= {
      val osmapping=scala.collection.mutable.Map[String, String]()
      val bufferedSource = Source.fromFile(configHelper.os_mapping_file)
      for (line <- bufferedSource.getLines) {
      val cols = line.split(",").map(_.trim)
      osmapping+=(cols(0).toLowerCase()->cols(1).toLowerCase())
    }

      bufferedSource.close
      return  osmapping
    }

    val deviceOSMapping = spark.sparkContext.broadcast(getDeviceNameOSMapping())

    val debug = true

    val msg = df.selectExpr("CAST(value AS STRING)")
      .withColumn("value", lower(col("value")))
      .select(from_json(col("value"), cefFormat.cef_json).as("data"))
      .select("data.*")

    import spark.sqlContext.implicits._  

    val newDF = msg.withColumn("created", lit(current_timestamp()))

  msg.writeStream
      .foreachBatch { (batchDF, _) =>

        val syslogDF=batchDF.filter(!$"message".contains("invalid syslog message:"))
                            .filter(!$"message".contains("fluentd worker is now stopping worker=0"))
                            .filter(!$"message".contains("fluentd worker is now running worker=0"))

        val syslogRDD=syslogDF.rdd.map(r=>{
          r.getString(0)
        }).map(x=>{  
          parseSysLog(x)
            })
            .filter(x=>deviceOSMapping.value.contains(x._1))

try {
          val threat_9_0_DF = spark.sqlContext.createDataFrame(syslogRDD.filter(x => deviceOSMapping.value(x._1).equals("9.0") & x._2.equals("threat"))
              .map(x => Row.fromSeq(x._3)),formatPA.threat_9_0)



              if(debug)
              threat_9_0_DF.show(true)

            threat_9_0_DF.write
            .cassandraFormat(configHelper.cassandra_table_syslog, configHelper.cassandra_keyspace)
            .mode("append")
            .save


            println("threat_9_0_DF saved")
           }
        catch {
              case e:Exception=>{
              println(e.getMessage)
              }
            }
try {
              val traffic_9_0_DF = spark.sqlContext.createDataFrame(syslogRDD.filter(x => deviceOSMapping.value(x._1).equals("9.0") & x._2.equals("traffic"))
                .map(x => Row.fromSeq(x._3)),formatPA.traffic_9_0)

if(debug)
traffic_9_0_DF.show(true)  

traffic_9_0_DF.write
                .cassandraFormat(configHelper.cassandra_table_syslog, configHelper.cassandra_keyspace)
                .mode("append")
                .save
              println("traffic_9_0_DF saved")
           }
        catch {
              case e:Exception=>{
              println(e.getMessage)
              }
            }

        }.start().awaitTermination()

      def parseSysLog(msg: String): (String,String,List[String]) = {

        //println("PRINTING MESSAGES")
        //println(msg)
        val splitmsg=msg.split(",")
        val traffic_type=splitmsg(3)
        val temp=splitmsg(0).split(" ")
        val date_time=temp.dropRight(2).mkString(" ")
        val domain_name=temp(temp.size-2)
        val future_use1=temp(temp.size-1)
        val device_name=domain_name.split("\\.")(0)
        var result=new ListBuffer[String]()
          //result+=temp2
          result+=date_time
          result+=domain_name
          result+=future_use1
          result=result++splitmsg.slice(1,splitmsg.size).toList
                        (device_name,traffic_type,result.toList)
      }
  }
}

    package configurations

    import org.apache.spark.sql.types.{StringType, StructType, TimestampType, DateType}

    object formatPA {

    val threat_9_0=new StructType()
            .add("date_time",StringType)
            .add("log_source",StringType)
            .add("future_use1",StringType)
            .add("received_time",StringType)
            .add("serial_number",StringType)
            .add("traffic_type",StringType)
            .add("threat_content_type",StringType)
            .add("future_use2",StringType)
            .add("generated_time",StringType)
            .add("src_ip",StringType)
            .add("dst_ip",StringType)
            .add("src_nat",StringType)
            .add("dst_nat",StringType)
            .add("rule_name",StringType)
            .add("src_user",StringType)
            .add("dst_user",StringType)
            .add("app",StringType)
            .add("vsys",StringType)
            .add("src_zone",StringType)
            .add("dst_zone",StringType)
            .add("igr_int",StringType)
            .add("egr_int",StringType)
            .add("log_fw_profile",StringType)
            .add("future_use3",StringType)
            .add("session_id",StringType)
            .add("repeat_count",StringType)
            .add("src_port",StringType)
            .add("dst_port",StringType)
            .add("src_nat_port",StringType)
            .add("dst_nat_port",StringType)
            .add("flags",StringType)
            .add("protocol",StringType)
            .add("action",StringType)
            .add("miscellaneous",StringType)
            .add("threat_id",StringType)
            .add("category",StringType)
            .add("severity",StringType)
            .add("direction",StringType)
            .add("seq_num",StringType)
            .add("act_flag",StringType)
            .add("src_geo_location",StringType)
            .add("dst_geo_location",StringType)
            .add("future_use4",StringType)
            .add("content_type",StringType)
            .add("pcap_id",StringType)
            .add("file_digest",StringType)
            .add("apt_cloud",StringType)
            .add("url_index",StringType)
            .add("user_agent",StringType)
            .add("file_type",StringType)
            .add("x_forwarded_for",StringType)
            .add("referer",StringType)
            .add("sender",StringType)
            .add("subject",StringType)
            .add("recipient",StringType)
            .add("report_id",StringType)
            .add("dghl1",StringType)
            .add("dghl2",StringType)
            .add("dghl3",StringType)
            .add("dghl4",StringType)
            .add("vsys_name",StringType)
            .add("device_name",StringType)
            .add("future_use5",StringType)
            .add("src_vm_uuid",StringType)
            .add("dst_vm_uuid",StringType)
            .add("http_method",StringType)
            .add("tunnel_id_imsi",StringType)
            .add("monitor_tag_imei",StringType)
            .add("parent_session_id",StringType)
            .add("parent_start_time",StringType)
            .add("tunnel_type",StringType)
            .add("threat_category",StringType)
            .add("content_version",StringType)
            .add("future_use6",StringType)
            .add("sctp_association_id",StringType)
            .add("payload_protocol_id",StringType)
            .add("http_headers",StringType)
            .add("url_category_list",StringType)
            .add("uuid_for_rule",StringType)
            .add("http_2_connection",StringType)
            .add("created",TimestampType)

        val traffic_9_0=new StructType()
            .add("date_time",StringType)
            .add("log_source",StringType)
            .add("future_use1",StringType)
            .add("received_time",StringType)
            .add("serial_number",StringType)
            .add("traffic_type",StringType)
            .add("threat_content_type",StringType)
            .add("future_use2",StringType)
            .add("generated_time",StringType)
            .add("src_ip",StringType)
            .add("dst_ip",StringType)
            .add("src_nat",StringType)
            .add("dst_nat",StringType)
            .add("rule_name",StringType)
            .add("src_user",StringType)
            .add("dst_user",StringType)
            .add("app",StringType)
            .add("vsys",StringType)
            .add("src_zone",StringType)
            .add("dst_zone",StringType)
            .add("igr_int",StringType)
            .add("egr_int",StringType)
            .add("log_fw_profile",StringType)
            .add("future_use3",StringType)
            .add("session_id",StringType)
            .add("repeat_count",StringType)
            .add("src_port",StringType)
            .add("dst_port",StringType)
            .add("src_nat_port",StringType)
            .add("dst_nat_port",StringType)
            .add("flags",StringType)
            .add("protocol",StringType)
            .add("action",StringType)
            .add("bytes",StringType)
            .add("bytes_sent",StringType)
            .add("bytes_received",StringType)
            .add("packets",StringType)
            .add("start_time",StringType)
            .add("end_time",StringType)
            .add("category",StringType)
            .add("future_use4",StringType)
            .add("seq_num",StringType)
            .add("act_flag",StringType)
            .add("src_geo_location",StringType)
            .add("dst_geo_location",StringType)
            .add("future_use5",StringType)
            .add("packet_sent",StringType)
            .add("packet_received",StringType)
            .add("session_end_reason",StringType)
            .add("dghl1",StringType)
            .add("dghl2",StringType)
            .add("dghl3",StringType)
            .add("dghl4",StringType)
            .add("vsys_name",StringType)
            .add("device_name",StringType)
            .add("action_source",StringType)
            .add("src_vm_uuid",StringType)
            .add("dst_vm_uuid",StringType)
            .add("tunnel_id_imsi",StringType)
            .add("monitor_tag_imei",StringType)
            .add("parent_session_id",StringType)
            .add("parent_start_time",StringType)
            .add("tunnel_type",StringType)
            .add("sctp_association_id",StringType)
            .add("sctp_chunks",StringType)
            .add("sctp_chunks_sent",StringType)
            .add("sctp_chunks_received",StringType)
            .add("uuid_for_rule",StringType)
            .add("http_2_connection",StringType)
            .add("created",TimestampType)    
}

The output for the above code is as follows:

+---------+----------+-----------+-------------+-------------+------------+-------------------+-----------+--------------+------+------+-------+-------+---------+--------+--------+---+----+--------+--------+-------+-------+--------------+-----------+----------+------------+--------+--------+------------+------------+-----+--------+------+-------------+---------+--------+--------+---------+-------+--------+----------------+----------------+-----------+------------+-------+-----------+---------+---------+----------+---------+---------------+-------+------+-------+---------+---------+-----+-----+-----+-----+---------+-----------+-----------+-----------+-----------+-----------+--------------+----------------+-----------------+-----------------+-----------+---------------+---------------+-----------+-------------------+-------------------+------------+-----------------+-------------+-----------------+-------+
                |date_time|log_source|future_use1|received_time|serial_number|traffic_type|threat_content_type|future_use2|generated_time|src_ip|dst_ip|src_nat|dst_nat|rule_name|src_user|dst_user|app|vsys|src_zone|dst_zone|igr_int|egr_int|log_fw_profile|future_use3|session_id|repeat_count|src_port|dst_port|src_nat_port|dst_nat_port|flags|protocol|action|miscellaneous|threat_id|category|severity|direction|seq_num|act_flag|src_geo_location|dst_geo_location|future_use4|content_type|pcap_id|file_digest|apt_cloud|url_index|user_agent|file_type|x_forwarded_for|referer|sender|subject|recipient|report_id|dghl1|dghl2|dghl3|dghl4|vsys_name|device_name|future_use5|src_vm_uuid|dst_vm_uuid|http_method|tunnel_id_imsi|monitor_tag_imei|parent_session_id|parent_start_time|tunnel_type|threat_category|content_version|future_use6|sctp_association_id|payload_protocol_id|http_headers|url_category_list|uuid_for_rule|http_2_connection|created|
+---------+----------+-----------+-------------+-------------+------------+-------------------+-----------+--------------+------+------+-------+-------+---------+--------+--------+---+----+--------+--------+-------+-------+--------------+-----------+----------+------------+--------+--------+------------+------------+-----+--------+------+-------------+---------+--------+--------+---------+-------+--------+----------------+----------------+-----------+------------+-------+-----------+---------+---------+----------+---------+---------------+-------+------+-------+---------+---------+-----+-----+-----+-----+---------+-----------+-----------+-----------+-----------+-----------+--------------+----------------+-----------------+-----------------+-----------+---------------+---------------+-----------+-------------------+-------------------+------------+-----------------+-------------+-----------------+-------+
                +---------+----------+-----------+-------------+-------------+------------+-------------------+-----------+--------------+------+------+-------+-------+---------+--------+--------+---+----+--------+--------+-------+-------+--------------+-----------+----------+------------+--------+--------+------------+------------+-----+--------+------+-------------+---------+--------+--------+---------+-------+--------+----------------+----------------+-----------+------------+-------+-----------+---------+---------+----------+---------+---------------+-------+------+-------+---------+---------+-----+-----+-----+-----+---------+-----------+-----------+-----------+-----------+-----------+--------------+----------------+-----------------+-----------------+-----------+---------------+---------------+-----------+-------------------+-------------------+------------+-----------------+-------------+-----------------+-------+

                threat_9_0_DF saved
            20/01/08 14:59:49 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
                java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 69
                if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 69, created), TimestampType), true, false) AS created#773
                    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
                    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
                    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
                    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
                    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
                    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
                    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
                    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
                    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
                    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
                    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
                    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
                    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
                    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
                    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
                    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
                    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
                    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
                    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
                    at org.apache.spark.scheduler.Task.run(Task.scala:121)
                    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
                    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
                    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
                    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                    at java.lang.Thread.run(Thread.java:748)
                Caused by: java.lang.ArrayIndexOutOfBoundsException: 69
                    at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:174)
                    at org.apache.spark.sql.Row$class.isNullAt(Row.scala:191)
                    at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:166)
                    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_34$(Unknown Source)
                    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
                    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)
                    ... 25 more
1

There are 1 answers

0
Jacek Laskowski On

It looks like it does not matter that the messages are in JSON format, does it?


Let's then use a sample dataset of any schema and add a timestamp column.

val messages = spark.range(3)
scala> messages.printSchema
root
 |-- id: long (nullable = false)

val withTs = messages.withColumn("timestamp", current_timestamp())
scala> withTs.printSchema
root
 |-- id: long (nullable = false)
 |-- timestamp: timestamp (nullable = false)

That gives you a dataset with a timestamp column.


The following line in your code should work, too (you don't need lit).

val xDF = thDF.withColumn("created", lit(current_timestamp()))   //This does not get cast to TimestampType

What do you mean by "This does not get cast to TimestampType"? How do you check it out? Are you perhaps confusing TimestampType in Spark and Cassandra? The Spark connector for Cassandra should handle it.

Let's give that a try:

val litTs = spark.range(3).withColumn("ts", lit(current_timestamp))
scala> litTs.printSchema
root
 |-- id: long (nullable = false)
 |-- ts: timestamp (nullable = false)

import org.apache.spark.sql.types._
val dataType = litTs.schema("ts").dataType
assert(dataType.isInstanceOf[TimestampType])

scala> println(dataType)
TimestampType