How to apply SMOTE algorithm (or an alternative) on a highly imbalanced PySpark dataset?

64 views Asked by At

I am new to pyspark and I am working with a pyspark dataset with severe imbalance (imbalance ratio: 0.75%). To fix this imbalance I thought to apply SMOTE on it without having to convert my dataset into pandas (because it is very large and I want to get the most out of Spark distribution). (context: dealing with a binary classification problem)

Anyone has an idea about how to do it with SMOTE or another similar/more useful algorithm ? Thank you !

I tried this code below where SmoteSampling() calls interpolate_features to apply SMOTE logic:

import random
from pyspark.sql.functions import rand
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark import SparkContext
from pyspark.sql import SparkSession
                        
def interpolate_features(features_list, k, percentageOver):
    newRows = []
    # nexs = number of samples to generate
    nexs = int(percentageOver / 100)
    
    for _ in range(int(nexs)):
        # select a random neighbor within the range k
        neigh = random.randint(1, k)
        #compute the difference between that neighbors features and the features of the first item in the list (because wee have only one item) 
        difs = [n - f for n, f in zip(features_list[neigh], features_list[0])] #zip will pair the elements from 1 and 2 
        #generate new record by interpolating features between the features of the first item and the difference computed earlier. Random values are added to the first item's features based on the difference with the neighbor        
        newRec = [f + random.random() * d for f, d in zip(features_list[0], difs)]
        #create new rows for the synthetic data, each row is constructed with a feature vector based on the interpolated features and a label of 1
        newRows.append(Row(features=Vectors.dense(newRec), label=1))

    return newRows

def SmoteSampling(df, k, minorityClass, majorityClass, percentageOver, percentageUnder):
    if percentageUnder > 100 or percentageUnder < 3:
        raise ValueError("Percentage Under must be in range 5 - 100")
    if percentageOver < 100:
        raise ValueError("Percentage Over must be at least 100")
    
    dataInput_min = vectorized.filter(vectorized['label'] == minorityClass)
    dataInput_maj = vectorized.filter(vectorized['label'] == majorityClass)

    #take features from minority df
    features_list = dataInput_min.select('features').rdd.map(lambda row: row['features'].toArray().tolist()).collect()
    #for each row, apply interpolation to create a new df
    new_data_rdd = sc.parallelize(features_list).flatMap(lambda row: interpolate_features(features_list, k, percentageOver))
    new_data = spark.createDataFrame(new_data_rdd)

    new_data_minor = dataInput_min.union(new_data)
    balanced_data = new_data_minor.union(dataInput_maj.sample(False, float(percentageUnder) / 100))

    return balanced_data


smoted = SmoteSampling(vectorised, k = 10, minorityClass = 1, majorityClass = 0, percentageOver = 2000, percentageUnder = 5)

However, when I apply it to my vectorised dataframe, I get different results everytime I run it.

0

There are 0 answers