Spark Dataframes- Reducing By Key

18.9k views Asked by At

Let's say I have a data structure like this where ts is some timestamp

case class Record(ts: Long, id: Int, value: Int)

Given a large number of these records I want to end up with the record with the highest timestamp for each id. Using the RDD api I think the following code gets the job done:

def findLatest(records: RDD[Record])(implicit spark: SparkSession) = {
  records.keyBy(_.id).reduceByKey{
    (x, y) => if(x.ts > y.ts) x else y
  }.values
}

Likewise this is my attempt with datasets:

def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = {
  records.groupByKey(_.id).mapGroups{
    case(id, records) => {
      records.reduceLeft((x,y) => if (x.ts > y.ts) x else y)
    }
  }
}

I've being trying to work out how to achieve something similar with dataframes but to no avail- I realise I can do the grouping with:

records.groupBy($"id")

But that gives me a RelationGroupedDataSet and it's not clear to me what aggregation function I need to write to achieve what I want- all example aggregations I've seen appear to focus on returning just a single column being aggregated rather than the whole row.

Is it possible to achieve this using dataframes?

2

There are 2 answers

4
Assaf Mendelson On BEST ANSWER

You can use the argmax logic (see databricks example)

For example, lets say your dataframe is called df and it has the columns id, val, ts you would do something like this:

import org.apache.spark.sql.functions._
val newDF = df.groupBy('id).agg.max(struct('ts, 'val)) as 'tmp).select($"id", $"tmp.*")
0
user238607 On

For Datasets I did this,tested on Spark 2.1.1

final case class AggregateResultModel(id: String,
                                      mtype: String,
                                      healthScore: Int,
                                      mortality: Float,
                                      reimbursement: Float)
.....
.....

// assume that the rawScores are loaded behorehand from json,csv files

val groupedResultSet = rawScores.as[AggregateResultModel].groupByKey( item => (item.id,item.mtype ))
      .reduceGroups( (x,y) => getMinHealthScore(x,y)).map(_._2)


// the binary function used in the reduceGroups

def getMinHealthScore(x : AggregateResultModel, y : AggregateResultModel): AggregateResultModel = {
    // complex logic for deciding between which row to keep
    if (x.healthScore > y.healthScore) { return y }
    else if (x.healthScore < y.healthScore) { return x }
    else {

      if (x.mortality < y.mortality) { return y }
      else if (x.mortality > y.mortality) { return x }
      else  {

        if(x.reimbursement < y.reimbursement)
          return x
        else
          return y

      }

    }

  }