Find Normal value using Min and Max from scala data-frame

380 views Asked by At

I have one data-frame with 39 columns and every column has different Normal range. By using the Normal range I want to find out the normal value and put 0 else put 1.

this is what i did but I want to do for 39 column.

val test :(Double => Double) =  (value: Double) =>
{
    if(value >= 45 && value <= 62) 0
    else 1
}

But I don't understand how to use different values to every column.

for ex : I have this DF

+--------------------+---------+-------------------------+---------+
|a                   |b        |c                        |d        |
+--------------------+---------+-------------------------+---------+
|               207.0|     40.0|                    193.0|     39.0|
|                98.0|     17.0|                    193.0|     15.0|
|               207.0|     13.0|                    193.0|     17.0|
|               207.0|     26.0|                    193.0|     23.0|
|               207.0|     35.0|                    193.0|     24.0|
|               207.0|     91.0|                    193.0|     45.0|
|               207.0|     40.0|                    193.0|     37.0|
|               207.0|     23.0|                    193.0|     23.0|
|               207.0|     26.0|                    193.0|     22.0|
|               207.0|     39.0|                    193.0|     34.0|

I want result like below using the ranges

col  range
a   50-160
b   1-21
c   5-40
d   7-27

if the value within range then 0 otherwise 1

+--------------------+---------+-------------------------+---------+
|a                   |b        |c                        |d        |
+--------------------+---------+-------------------------+---------+
|                 1.0|      1.0|                      1.0|      1.0|
|                 0.0|      0.0|                      1.0|      0.0|
|                 1.0|      0.0|                      1.0|      0.0|
|                 1.0|      1.0|                      1.0|      0.0|
|                 1.0|      1.0|                      1.0|      0.0|
|                 1.0|      1.0|                      1.0|      1.0|
|                 1.0|      1.0|                      1.0|      1.0|
|                 1.0|      1.0|                      1.0|      0.0|
|                 1.0|      1.0|                      1.0|      0.0|
|                 1.0|      1.0|                      1.0|      1.0|

I want to do this for 39 columns.(scala/pyspark preferred)
1

There are 1 answers

4
belka On BEST ANSWER

You should define a user-defined function (UDF), and then apply it to every column you want.

Here is documentation on user-defined functions for Scala. It's rather very complete and I encourage you to read it.

Here is an extract to help you understand quickly where I want to go here:

scala> df.withColumn("upper", upper('text)).show
+---+-----+-----+
| id| text|upper|
+---+-----+-----+
|  0|hello|HELLO|
|  1|world|WORLD|
+---+-----+-----+

// You could have also defined the UDF this way
val upperUDF = udf { s: String => s.toUpperCase }

// or even this way
val upperUDF = udf[String, String](_.toUpperCase)

scala> df.withColumn("upper", upperUDF('text)).show
+---+-----+-----+
| id| text|upper|
+---+-----+-----+
|  0|hello|HELLO|
|  1|world|WORLD|
+---+-----+-----+

You see you function applies to the entire column and the result would be a new Column. Hence, your function should look like this:

def isInRange(e: Number, min: Number, max: Number): Boolean = (e < max && e > min)

Then, for a given minValue and maxValue, all you do is:

myDF.withColumn("isInRange_a", udf(x => isInRange(x, minValue, maxValue).apply(myDF("a")))

What you can do now to apply it over a given List/DataFrame containing (varName, maxValue, minValue) is:

  • either a map/reduce operation, where you would compute for each column whether it's the given range or not. Then, you would join on a given Key (I don't know much of your problem so I can't help you here). This solution works but would become very unefficient as data grows, because you could have several keys looking alike.

  • either a recursive operation, where the goal is to perform something like: myDF.whithColumn(...).withColumn(...).withColumn(...) etc

The second solution is the one I'll choose because of the keys that could look alike.

How you do it?

def applyMyUDFRecursively(myDF: DataFrame, List[MyRange]: rangesList): DataFrame =
if (rangesList == null || rangesList.isEmpty) myDF
else applyMyUDFRecursively(
    myDF.withColumn(myDF.withColumn("isInRange_" + rangesList.head._0, udf(x => isInRange(x, rangesList.head._1, rangesList.head._2).apply(myDF(rangesList.head._0))), rangesList.tail)

Now you applied to all your columns, but you may have too much columns. Do something like this:

resultDF.drop(rangesList.map(case x => x._0).collect: _*)

Notice the type ascription to apply the drop function to all elements inside the list obtained when map/collect

with val MyRange = Seq(varName: String, min: Number, max: Number)

Eg. for your DataFrame, it should look like this (a simpler version):

def recApply(myDF: DataFrame, cols: List[String]): DataFrame =
if (cols == null || cols.isEmpty) myDF
else recApply(myDF.withColumn(myDF.withColumn("isInRange_" + col.head, udf(x => test(x).apply(myDF(cols.head))), cols.tail)

Then, apply this function to your DF and store your result:

val my_result = recApply(myDF, myDF.cols)