I want to order by timestamp some avro files that I retrieve from HDFS.

The schema of my avro files is :

headers : Map[String,String], body : String

Now the tricky part is that the timestamp is one of the key/value from the map. So I have the timestamp contained in the map like this :

key_1 -> value_1, key_2 -> value_2, timestamp -> 1234567, key_n -> value_n

Note that the type of the values is String.

I created a case class to create my dataset with this schema :

case class Root(headers : Map[String,String], body: String)

Creation of my dataset :

val ds = spark
          .read
          .format("com.databricks.spark.avro")
          .load(pathToHDFS)
          .as[Root]

I don't really know how to begin with this problem since I can only get the columns headers and body. How can I get the nested values to finally sort by timestamp ?

I would like to do something like this :

ds.select("headers").doSomethingToGetTheMapStructure.doSomeConversionStringToTimeStampForTheColumnTimeStamp("timestamp").orderBy("timestamp")

A little precision : I don't want to loose any data from my initial dataset, just a sorting operation.

I use Spark 2.3.0.

3 Answers

1
Leo C On Best Solutions

The loaded Dataset should look something similar to the sample dataset below:

case class Root(headers : Map[String, String], body: String)

val ds = Seq(
  Root(Map("k11"->"v11", "timestamp"->"1554231600", "k12"->"v12"), "body1"),
  Root(Map("k21"->"v21", "timestamp"->"1554134400", "k22"->"v22"), "body2")
).toDS

You can simply look up the Map by the timestamp key, cast the value to Long, and perform an orderBy as follows:

ds.
  withColumn("ts", $"headers"("timestamp").cast("Long")).
  orderBy("ts").
  show(false)
// +-------------------------------------------------+-----+----------+
// |headers                                          |body |ts        |
// +-------------------------------------------------+-----+----------+
// |[k21 -> v21, timestamp -> 1554134400, k22 -> v22]|body2|1554134400|
// |[k11 -> v11, timestamp -> 1554231600, k12 -> v12]|body1|1554231600|
// +-------------------------------------------------+-----+----------+

Note that $"headers"("timestamp") is just the same as using the apply column method (i.e. $"headers".apply("timestamp")).

Alternatively, you could also use getItem to access the Map by key, like:

$"headers".getItem("timestamp")
2
SylarBenes On

You can use Scala's sortBy, which takes a function. I would advise you to explicitly declare the val ds as a Vector (or other collection), that way you will see the applicable functions in IntelliJ (if you're using IntelliJ) and it will definitely compile.

See my example below based on your code :

  case class Root(headers : Map[String,String], body: String)

  val ds: Vector[Root] = spark
    .read
    .format("com.databricks.spark.avro")
    .load(pathToHDFS)
    .as[Root]

  val sorted = ds.sortBy(r => r.headers.get("timestamp").map(PROCESSING) ).reverse

Edit: added reverse (assuming you want it descending). Inside the function that you pass as argument, you would also put the processing to timestamp.

0
Yayati Sule On
import org.apache.spark.sql.{Encoders, Encoder, Dataset}
import org.apache.spark.sql.functions.{col, desc}
import java.sql.Timestamp

case class Nested(key_1: String,key_2: String,timestamp: Timestamp,key_n: String)
case class Root(headers:Nested,body:String)

implicit val rootCodec: Encoder[Root] = Encoders.product[Root]

val avroDS:Dataset[Root] = spark.read
                                .format("com.databricks.spark.avro")
                                .load(pathToHDFS)
                                .as[Root]

val sortedDF: DataFrame = avroDS.orderBy(desc(col("timestamp")))

This code snippet would directly cast your Avro data to Dataset[Root]. You wont have to rely on importing sparksession.implicits and would eliminate the step of casting your timestamp field to TimestampType. Internally, Spark's Timestamp datatype is implemented using java.sql.Timestamp.