Select entire row based on a logic applied on 2 columns across multiple rows

248 views Asked by At

Below is the input Dataframe(In real this is a very large Dataframe)

enter image description here

Get the latest income of an employee which has Income_age ts <10 months and if there is no Income data within less than 10 months take the value from sourcedIncome column for that account instead of taking from Income column the above logic goes to calculateIncome function below

Expected output Dataframe

enter image description here

Below is what I'm planning to implement

case class employee (EmployeeID: Int, INCOME: Int, INCOMEAGE: Int, JOINDATE: Int, DEPT: String)

val empSchema = new StructType().add("EmployeeID","Int").add("INCOME", "Int").add("INCOMEAGE","Date") . add("JOINDATE","Date"). add("DEPT","String")

//Reading from the File
import sparkSession.implicits._

val readEmpFile = sparkSession.read
  .option("sep", ",")
  .schema(empSchema)
  .csv(INPUT_DIRECTORY)

//Create employee DataFrame
val custDf = readEmpFile.as[employee]

//Adding Salary Column
val groupByDf = custDf.groupByKey(a => a. EmployeeID)
val k = groupByDf.mapGroups((key,value) => performETL(value))

def performETL(empData: Iterator[employee]) : new employee  = {
  val empList = empData.toList
  //calculate income which has Logic to figure out latest income for an account which is < 10 months and returns the latest income
  val income = calculateIncome(empList)

  for (i <- empList) {    
      val row = i
      return new employee(row.EmployeeID, row.INCOMEAGE , income)
  }
  return  "Done"
}

Is this the right approach to implement? If not, please suggest a better way to implement the same.

The solution has to be feasible for both batch and structured streaming .

0

There are 0 answers