Below is the input Dataframe(In real this is a very large Dataframe)
Get the latest income of an employee which has Income_age ts <10 months and if there is no Income data within less than 10 months take the value from sourcedIncome column for that account instead of taking from Income column the above logic goes to calculateIncome function below
Expected output Dataframe
Below is what I'm planning to implement
case class employee (EmployeeID: Int, INCOME: Int, INCOMEAGE: Int, JOINDATE: Int, DEPT: String)
val empSchema = new StructType().add("EmployeeID","Int").add("INCOME", "Int").add("INCOMEAGE","Date") . add("JOINDATE","Date"). add("DEPT","String")
//Reading from the File
import sparkSession.implicits._
val readEmpFile = sparkSession.read
.option("sep", ",")
.schema(empSchema)
.csv(INPUT_DIRECTORY)
//Create employee DataFrame
val custDf = readEmpFile.as[employee]
//Adding Salary Column
val groupByDf = custDf.groupByKey(a => a. EmployeeID)
val k = groupByDf.mapGroups((key,value) => performETL(value))
def performETL(empData: Iterator[employee]) : new employee = {
val empList = empData.toList
//calculate income which has Logic to figure out latest income for an account which is < 10 months and returns the latest income
val income = calculateIncome(empList)
for (i <- empList) {
val row = i
return new employee(row.EmployeeID, row.INCOMEAGE , income)
}
return "Done"
}
Is this the right approach to implement? If not, please suggest a better way to implement the same.
The solution has to be feasible for both batch and structured streaming .