I'm using Spark 2.4.3 and Scala.
I'm fetching messages from a streaming kafka source of the following structure:
{"message": "Jan 7 17:53:48 PA-850.abc.com 1,2020/01/07 17:53:41,001801012404,TRAFFIC,drop,2304,2020/01/07 17:53:41,10.7.26.51,10.8.3.11,0.0.0.0,0.0.0.0,interzone-default,,,not-applicable,vsys1,SERVER-VLAN,VPN,ethernet1/6.18,,test-1,2020/01/07 17:53:41,0,1,45194,514,0,0,0x0,udp,deny,588,588,0,1,2020/01/07 17:53:45,0,any,0,35067255521,0x8000000000000000,10.0.0.0-10.255.255.255,10.0.0.0-10.255.255.255,0,1,0,policy-deny,0,0,0,0,,PA-850,from-policy,,,0,,0,,N/A,0,0,0,0,b804eab2-f240-467a-be97-6f8c382afd4c,0","source_host": "10.7.26.51"}
My goal is to add a new timestamp
column to each row with the current timestamp in my streaming data. I have to insert all these rows into a cassandra table.
package devices
import configurations._
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, from_json, lower, split}
import org.apache.spark.sql.cassandra._
import scala.collection.mutable.{ListBuffer, Map}
import scala.io.Source
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType,TimestampType}
import org.apache.spark.sql.functions.to_timestamp
import org.apache.spark.sql.functions.unix_timestamp
object PA {
def main(args: Array[String]): Unit = {
val spark = SparkBuilder.spark
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", configHelper.kafka_host)
.option("subscribe", configHelper.log_topic)
.option("startingOffsets", "earliest")
.option("multiLine", true)
.option("includeTimestamp",true)
.load()
df.printSchema()
def getDeviceNameOSMapping():Map[String,String]= {
val osmapping=scala.collection.mutable.Map[String, String]()
val bufferedSource = Source.fromFile(configHelper.os_mapping_file)
for (line <- bufferedSource.getLines) {
val cols = line.split(",").map(_.trim)
osmapping+=(cols(0).toLowerCase()->cols(1).toLowerCase())
}
bufferedSource.close
return osmapping
}
val deviceOSMapping = spark.sparkContext.broadcast(getDeviceNameOSMapping())
val debug = true
val msg = df.selectExpr("CAST(value AS STRING)")
.withColumn("value", lower(col("value")))
.select(from_json(col("value"), cefFormat.cef_json).as("data"))
.select("data.*")
import spark.sqlContext.implicits._
val newDF = msg.withColumn("created", lit(current_timestamp()))
msg.writeStream
.foreachBatch { (batchDF, _) =>
val syslogDF=batchDF.filter(!$"message".contains("invalid syslog message:"))
.filter(!$"message".contains("fluentd worker is now stopping worker=0"))
.filter(!$"message".contains("fluentd worker is now running worker=0"))
val syslogRDD=syslogDF.rdd.map(r=>{
r.getString(0)
}).map(x=>{
parseSysLog(x)
})
.filter(x=>deviceOSMapping.value.contains(x._1))
try {
val threat_9_0_DF = spark.sqlContext.createDataFrame(syslogRDD.filter(x => deviceOSMapping.value(x._1).equals("9.0") & x._2.equals("threat"))
.map(x => Row.fromSeq(x._3)),formatPA.threat_9_0)
if(debug)
threat_9_0_DF.show(true)
threat_9_0_DF.write
.cassandraFormat(configHelper.cassandra_table_syslog, configHelper.cassandra_keyspace)
.mode("append")
.save
println("threat_9_0_DF saved")
}
catch {
case e:Exception=>{
println(e.getMessage)
}
}
try {
val traffic_9_0_DF = spark.sqlContext.createDataFrame(syslogRDD.filter(x => deviceOSMapping.value(x._1).equals("9.0") & x._2.equals("traffic"))
.map(x => Row.fromSeq(x._3)),formatPA.traffic_9_0)
if(debug)
traffic_9_0_DF.show(true)
traffic_9_0_DF.write
.cassandraFormat(configHelper.cassandra_table_syslog, configHelper.cassandra_keyspace)
.mode("append")
.save
println("traffic_9_0_DF saved")
}
catch {
case e:Exception=>{
println(e.getMessage)
}
}
}.start().awaitTermination()
def parseSysLog(msg: String): (String,String,List[String]) = {
//println("PRINTING MESSAGES")
//println(msg)
val splitmsg=msg.split(",")
val traffic_type=splitmsg(3)
val temp=splitmsg(0).split(" ")
val date_time=temp.dropRight(2).mkString(" ")
val domain_name=temp(temp.size-2)
val future_use1=temp(temp.size-1)
val device_name=domain_name.split("\\.")(0)
var result=new ListBuffer[String]()
//result+=temp2
result+=date_time
result+=domain_name
result+=future_use1
result=result++splitmsg.slice(1,splitmsg.size).toList
(device_name,traffic_type,result.toList)
}
}
}
package configurations
import org.apache.spark.sql.types.{StringType, StructType, TimestampType, DateType}
object formatPA {
val threat_9_0=new StructType()
.add("date_time",StringType)
.add("log_source",StringType)
.add("future_use1",StringType)
.add("received_time",StringType)
.add("serial_number",StringType)
.add("traffic_type",StringType)
.add("threat_content_type",StringType)
.add("future_use2",StringType)
.add("generated_time",StringType)
.add("src_ip",StringType)
.add("dst_ip",StringType)
.add("src_nat",StringType)
.add("dst_nat",StringType)
.add("rule_name",StringType)
.add("src_user",StringType)
.add("dst_user",StringType)
.add("app",StringType)
.add("vsys",StringType)
.add("src_zone",StringType)
.add("dst_zone",StringType)
.add("igr_int",StringType)
.add("egr_int",StringType)
.add("log_fw_profile",StringType)
.add("future_use3",StringType)
.add("session_id",StringType)
.add("repeat_count",StringType)
.add("src_port",StringType)
.add("dst_port",StringType)
.add("src_nat_port",StringType)
.add("dst_nat_port",StringType)
.add("flags",StringType)
.add("protocol",StringType)
.add("action",StringType)
.add("miscellaneous",StringType)
.add("threat_id",StringType)
.add("category",StringType)
.add("severity",StringType)
.add("direction",StringType)
.add("seq_num",StringType)
.add("act_flag",StringType)
.add("src_geo_location",StringType)
.add("dst_geo_location",StringType)
.add("future_use4",StringType)
.add("content_type",StringType)
.add("pcap_id",StringType)
.add("file_digest",StringType)
.add("apt_cloud",StringType)
.add("url_index",StringType)
.add("user_agent",StringType)
.add("file_type",StringType)
.add("x_forwarded_for",StringType)
.add("referer",StringType)
.add("sender",StringType)
.add("subject",StringType)
.add("recipient",StringType)
.add("report_id",StringType)
.add("dghl1",StringType)
.add("dghl2",StringType)
.add("dghl3",StringType)
.add("dghl4",StringType)
.add("vsys_name",StringType)
.add("device_name",StringType)
.add("future_use5",StringType)
.add("src_vm_uuid",StringType)
.add("dst_vm_uuid",StringType)
.add("http_method",StringType)
.add("tunnel_id_imsi",StringType)
.add("monitor_tag_imei",StringType)
.add("parent_session_id",StringType)
.add("parent_start_time",StringType)
.add("tunnel_type",StringType)
.add("threat_category",StringType)
.add("content_version",StringType)
.add("future_use6",StringType)
.add("sctp_association_id",StringType)
.add("payload_protocol_id",StringType)
.add("http_headers",StringType)
.add("url_category_list",StringType)
.add("uuid_for_rule",StringType)
.add("http_2_connection",StringType)
.add("created",TimestampType)
val traffic_9_0=new StructType()
.add("date_time",StringType)
.add("log_source",StringType)
.add("future_use1",StringType)
.add("received_time",StringType)
.add("serial_number",StringType)
.add("traffic_type",StringType)
.add("threat_content_type",StringType)
.add("future_use2",StringType)
.add("generated_time",StringType)
.add("src_ip",StringType)
.add("dst_ip",StringType)
.add("src_nat",StringType)
.add("dst_nat",StringType)
.add("rule_name",StringType)
.add("src_user",StringType)
.add("dst_user",StringType)
.add("app",StringType)
.add("vsys",StringType)
.add("src_zone",StringType)
.add("dst_zone",StringType)
.add("igr_int",StringType)
.add("egr_int",StringType)
.add("log_fw_profile",StringType)
.add("future_use3",StringType)
.add("session_id",StringType)
.add("repeat_count",StringType)
.add("src_port",StringType)
.add("dst_port",StringType)
.add("src_nat_port",StringType)
.add("dst_nat_port",StringType)
.add("flags",StringType)
.add("protocol",StringType)
.add("action",StringType)
.add("bytes",StringType)
.add("bytes_sent",StringType)
.add("bytes_received",StringType)
.add("packets",StringType)
.add("start_time",StringType)
.add("end_time",StringType)
.add("category",StringType)
.add("future_use4",StringType)
.add("seq_num",StringType)
.add("act_flag",StringType)
.add("src_geo_location",StringType)
.add("dst_geo_location",StringType)
.add("future_use5",StringType)
.add("packet_sent",StringType)
.add("packet_received",StringType)
.add("session_end_reason",StringType)
.add("dghl1",StringType)
.add("dghl2",StringType)
.add("dghl3",StringType)
.add("dghl4",StringType)
.add("vsys_name",StringType)
.add("device_name",StringType)
.add("action_source",StringType)
.add("src_vm_uuid",StringType)
.add("dst_vm_uuid",StringType)
.add("tunnel_id_imsi",StringType)
.add("monitor_tag_imei",StringType)
.add("parent_session_id",StringType)
.add("parent_start_time",StringType)
.add("tunnel_type",StringType)
.add("sctp_association_id",StringType)
.add("sctp_chunks",StringType)
.add("sctp_chunks_sent",StringType)
.add("sctp_chunks_received",StringType)
.add("uuid_for_rule",StringType)
.add("http_2_connection",StringType)
.add("created",TimestampType)
}
The output for the above code is as follows:
+---------+----------+-----------+-------------+-------------+------------+-------------------+-----------+--------------+------+------+-------+-------+---------+--------+--------+---+----+--------+--------+-------+-------+--------------+-----------+----------+------------+--------+--------+------------+------------+-----+--------+------+-------------+---------+--------+--------+---------+-------+--------+----------------+----------------+-----------+------------+-------+-----------+---------+---------+----------+---------+---------------+-------+------+-------+---------+---------+-----+-----+-----+-----+---------+-----------+-----------+-----------+-----------+-----------+--------------+----------------+-----------------+-----------------+-----------+---------------+---------------+-----------+-------------------+-------------------+------------+-----------------+-------------+-----------------+-------+
|date_time|log_source|future_use1|received_time|serial_number|traffic_type|threat_content_type|future_use2|generated_time|src_ip|dst_ip|src_nat|dst_nat|rule_name|src_user|dst_user|app|vsys|src_zone|dst_zone|igr_int|egr_int|log_fw_profile|future_use3|session_id|repeat_count|src_port|dst_port|src_nat_port|dst_nat_port|flags|protocol|action|miscellaneous|threat_id|category|severity|direction|seq_num|act_flag|src_geo_location|dst_geo_location|future_use4|content_type|pcap_id|file_digest|apt_cloud|url_index|user_agent|file_type|x_forwarded_for|referer|sender|subject|recipient|report_id|dghl1|dghl2|dghl3|dghl4|vsys_name|device_name|future_use5|src_vm_uuid|dst_vm_uuid|http_method|tunnel_id_imsi|monitor_tag_imei|parent_session_id|parent_start_time|tunnel_type|threat_category|content_version|future_use6|sctp_association_id|payload_protocol_id|http_headers|url_category_list|uuid_for_rule|http_2_connection|created|
+---------+----------+-----------+-------------+-------------+------------+-------------------+-----------+--------------+------+------+-------+-------+---------+--------+--------+---+----+--------+--------+-------+-------+--------------+-----------+----------+------------+--------+--------+------------+------------+-----+--------+------+-------------+---------+--------+--------+---------+-------+--------+----------------+----------------+-----------+------------+-------+-----------+---------+---------+----------+---------+---------------+-------+------+-------+---------+---------+-----+-----+-----+-----+---------+-----------+-----------+-----------+-----------+-----------+--------------+----------------+-----------------+-----------------+-----------+---------------+---------------+-----------+-------------------+-------------------+------------+-----------------+-------------+-----------------+-------+
+---------+----------+-----------+-------------+-------------+------------+-------------------+-----------+--------------+------+------+-------+-------+---------+--------+--------+---+----+--------+--------+-------+-------+--------------+-----------+----------+------------+--------+--------+------------+------------+-----+--------+------+-------------+---------+--------+--------+---------+-------+--------+----------------+----------------+-----------+------------+-------+-----------+---------+---------+----------+---------+---------------+-------+------+-------+---------+---------+-----+-----+-----+-----+---------+-----------+-----------+-----------+-----------+-----------+--------------+----------------+-----------------+-----------------+-----------+---------------+---------------+-----------+-------------------+-------------------+------------+-----------------+-------------+-----------------+-------+
threat_9_0_DF saved
20/01/08 14:59:49 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 69
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 69, created), TimestampType), true, false) AS created#773
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 69
at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:174)
at org.apache.spark.sql.Row$class.isNullAt(Row.scala:191)
at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:166)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_34$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)
... 25 more
It looks like it does not matter that the messages are in JSON format, does it?
Let's then use a sample dataset of any schema and add a timestamp column.
That gives you a dataset with a timestamp column.
The following line in your code should work, too (you don't need
lit
).What do you mean by "This does not get cast to TimestampType"? How do you check it out? Are you perhaps confusing
TimestampType
in Spark and Cassandra? The Spark connector for Cassandra should handle it.Let's give that a try: