I am new to pyspark and I am working with a pyspark dataset with severe imbalance (imbalance ratio: 0.75%). To fix this imbalance I thought to apply SMOTE on it without having to convert my dataset into pandas (because it is very large and I want to get the most out of Spark distribution). (context: dealing with a binary classification problem)
Anyone has an idea about how to do it with SMOTE or another similar/more useful algorithm ? Thank you !
I tried this code below where SmoteSampling() calls interpolate_features to apply SMOTE logic:
import random
from pyspark.sql.functions import rand
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark import SparkContext
from pyspark.sql import SparkSession
def interpolate_features(features_list, k, percentageOver):
newRows = []
# nexs = number of samples to generate
nexs = int(percentageOver / 100)
for _ in range(int(nexs)):
# select a random neighbor within the range k
neigh = random.randint(1, k)
#compute the difference between that neighbors features and the features of the first item in the list (because wee have only one item)
difs = [n - f for n, f in zip(features_list[neigh], features_list[0])] #zip will pair the elements from 1 and 2
#generate new record by interpolating features between the features of the first item and the difference computed earlier. Random values are added to the first item's features based on the difference with the neighbor
newRec = [f + random.random() * d for f, d in zip(features_list[0], difs)]
#create new rows for the synthetic data, each row is constructed with a feature vector based on the interpolated features and a label of 1
newRows.append(Row(features=Vectors.dense(newRec), label=1))
return newRows
def SmoteSampling(df, k, minorityClass, majorityClass, percentageOver, percentageUnder):
if percentageUnder > 100 or percentageUnder < 3:
raise ValueError("Percentage Under must be in range 5 - 100")
if percentageOver < 100:
raise ValueError("Percentage Over must be at least 100")
dataInput_min = vectorized.filter(vectorized['label'] == minorityClass)
dataInput_maj = vectorized.filter(vectorized['label'] == majorityClass)
#take features from minority df
features_list = dataInput_min.select('features').rdd.map(lambda row: row['features'].toArray().tolist()).collect()
#for each row, apply interpolation to create a new df
new_data_rdd = sc.parallelize(features_list).flatMap(lambda row: interpolate_features(features_list, k, percentageOver))
new_data = spark.createDataFrame(new_data_rdd)
new_data_minor = dataInput_min.union(new_data)
balanced_data = new_data_minor.union(dataInput_maj.sample(False, float(percentageUnder) / 100))
return balanced_data
smoted = SmoteSampling(vectorised, k = 10, minorityClass = 1, majorityClass = 0, percentageOver = 2000, percentageUnder = 5)
However, when I apply it to my vectorised dataframe, I get different results everytime I run it.