Is there no "inverse_transform" method for a scaler like MinMaxScaler in spark?

3.6k views Asked by At

When train a model, say linear regression, we may make a normalization, like MinMaxScaler, on the train an test dataset.

After we got a trained model and use it to make predictions, and scale back the predictions to the original representation.

In python, there is "inverse_transform" method. For example:

from sklearn.preprocessing import MinMaxScaler
scalerModel.inverse_transform

from sklearn.preprocessing import MinMaxScaler

data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]

scaler = MinMaxScaler()
MinMaxScaler(copy=True, feature_range=(0, 1))
print(data)

dataScaled = scaler.fit(data).transform(data)
print(dataScaled)

scaler.inverse_transform(dataScaled)

Is there similar method in spark?

I have googled a lot, but found no answer. Can anyone give me some suggestions? Thank you very much!

3

There are 3 answers

0
Aleksandrs Krivickis On

Maybe I'm too late to the party, however, recently faced exactly the same problem and couldn't find any viable solution.

Presuming that the author of this question doesn't have to inverse MinMax Values of vectors, instead, there is a need to inverse only one column. Min Max values of a column, as well as min-max parameters of the scaler, are also known.

Maths behind MinMaxScaler as per scikit learn website:

X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))
X_scaled = X_std * (max - min) + min

"Reverse-engineered" MinMaxScaler formula

X_scaled = (X - Xmin) / (Xmax) - Xmin) * (max - min) + min
X = (max * Xmin - min * Xmax - Xmin * X_scaled + Xmax * X_scaled)/(max - min)

Implementation

from sklearn.preprocessing import MinMaxScaler
import pandas

data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]

scaler = MinMaxScaler(copy=True, feature_range=(0, 1))
print(data)
dataScaled = scaler.fit(data).transform(data)

data_sp = spark.createDataFrame(pandas.DataFrame(data, columns=["x", "y"]).join(pandas.DataFrame(dataScaled, columns=["x_scaled", "y_scaled"])))
data_sp.show()
print("Inversing column: y_scaled")
Xmax = data_sp.select("y").rdd.max()[0]
Xmin = data_sp.select("y").rdd.min()[0]
_max = scaler.feature_range[1]
_min = scaler.feature_range[0]

print("Xmax =", Xmax, "Xmin =", Xmin, "max =", _max, "min =", _min)
data_sp.withColumn(colName="y_scaled_inversed", col=(_max * Xmin - _min * Xmax - Xmin * data_sp.y_scaled + Xmax * data_sp.y_scaled)/(_max - _min)).show()

Outputs

[[-1, 2], [-0.5, 6], [0, 10], [1, 18]]
+----+---+--------+--------+
|   x|  y|x_scaled|y_scaled|
+----+---+--------+--------+
|-1.0|  2|     0.0|     0.0|
|-0.5|  6|    0.25|    0.25|
| 0.0| 10|     0.5|     0.5|
| 1.0| 18|     1.0|     1.0|
+----+---+--------+--------+

Inversing column: y_scaled
Xmax = 18 Xmin = 2 max = 1 min = 0
+----+---+--------+--------+-----------------+
|   x|  y|x_scaled|y_scaled|y_scaled_inversed|
+----+---+--------+--------+-----------------+
|-1.0|  2|     0.0|     0.0|              2.0|
|-0.5|  6|    0.25|    0.25|              6.0|
| 0.0| 10|     0.5|     0.5|             10.0|
| 1.0| 18|     1.0|     1.0|             18.0|
+----+---+--------+--------+-----------------+
0
yaakov tayeb On

No direct solution here. Since passing an array to a UDFs can only be done when the array is a column (lit(array) won't do the trick) I am using the following workaround.

In a nutshell it turns an inverted scales array into a string, pass it to the UDFs, and solve the math.

You can use that scaled array (string) in an inverse function (also attached here), the get the inverted values.

Code:

from pyspark.ml.feature import VectorAssembler, QuantileDiscretizer
from pyspark.ml.linalg import SparseVector, DenseVector, Vectors, VectorUDT

df = spark.createDataFrame([
    (0, 1, 0.5, -1),
    (1, 2, 1.0, 1),
    (2, 4, 10.0, 2)
], ["id", 'x1', 'x2', 'x3'])

df.show()

def Normalize(df):

    scales = df.describe()
    scales = scales.filter("summary = 'mean' or summary = 'stddev'")
    scales = scales.select(["summary"] + [col(c).cast("double") for c in scales.columns[1:]])

    assembler = VectorAssembler(
         inputCols=scales.columns[1:],
         outputCol="X_scales")

    df_scales = assembler.transform(scales)

    x_mean = df_scales.filter("summary = 'mean'").select('X_scales')
    x_std = df_scales.filter("summary = 'stddev'").select('X_scales')

    ks_std_lit = lit('|'.join([str(s) for s in list(x_std.collect()[0].X_scales)]))
    ks_mean_lit = lit('|'.join([str(s) for s in list(x_mean.collect()[0].X_scales)]))

    assembler = VectorAssembler(
    inputCols=df.columns[0:4],
    outputCol="features")

    df_features = assembler.transform(df)
    df_features = df_features.withColumn('Scaled', exec_norm_udf(df_features.features, ks_mean_lit, ks_std_lit))

    return df_features, ks_mean_lit, ks_std_lit

def exec_norm(vector, x_mean, x_std):
    x_mean = [float(s) for s in x_mean.split('|')]
    x_std = [float(s) for s in x_std.split('|')]

    res = (np.array(vector) - np.array(x_mean)) / np.array(x_std)
    res = list(res)

    return Vectors.dense(res)


exec_norm_udf = udf(exec_norm, VectorUDT())


def scaler_invert(vector, x_mean, x_std):
    x_mean = [float(s) for s in x_mean.split('|')]
    x_std = [float(s) for s in x_std.split('|')]

    res = (np.array(vector) * np.array(x_std)) + np.array(x_mean)
    res = list(res)

    return Vectors.dense(res)


scaler_invert_udf = udf(scaler_invert, VectorUDT())


df, scaler_mean, scaler_std = Normalize(df)
df.withColumn('inverted', scaler_invert_udf(df.Scaled, scaler_mean, scaler_std)).show(truncate=False)
0
Simone Grandi On

In our company, in order to solve the same problem on the StandardScaler, we extended spark.ml with this (among other things):

package org.apache.spark.ml

import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.ml.util.Identifiable

package object feature {

    implicit class RichStandardScalerModel(model: StandardScalerModel) {

        private def invertedStdDev(sigma: Double): Double = 1 / sigma

        private def invertedMean(mu: Double, sigma: Double): Double = -mu / sigma

        def inverse(newOutputCol: String): StandardScalerModel = {
            val sigma: linalg.Vector = model.std
            val mu: linalg.Vector = model.mean
            val newSigma: linalg.Vector = new DenseVector(sigma.toArray.map(invertedStdDev))
            val newMu: linalg.Vector = new DenseVector(mu.toArray.zip(sigma.toArray).map { case (m, s) => invertedMean(m, s) })
            val inverted: StandardScalerModel = new StandardScalerModel(Identifiable.randomUID("stdScal"), newSigma, newMu)
                .setInputCol(model.getOutputCol)
                .setOutputCol(newOutputCol)

            inverted
                .set(inverted.withMean, model.getWithMean)
                .set(inverted.withStd, model.getWithStd)
        }
    }

}

It should be fairly easy to modify it or do something similar for your specific case.

Keep in mind that due to JVM's double implementation, you normally lose precision in these operations, so you will not recover the exact original values you had before the transformation (e.g.: you will probably get something like 1.9999999999999998 instead of 2.0).