I have a training dataset with some feature columns. I'm converting this dataset into a StandardScaler dataset and using that to generate a covariance matrix, and then calculating the square root of this matrix. I'm using breeze.linalg
for generating the cov matrix. Here's my code
def generateMetrics(trainingData: DataFrame, featureColumnNames: Array[String]): (DataFrame, List[Array[Double]]) = {
val scaledFeatureVecCol = "scaledFeatures"
println("Training data total count - " + trainingData.count())
val standardScalerDf = calculateStandardScalar(trainingData, featureColumnNames, scaledFeatureVecCol)
println("Standard scaler output - " + standardScalerDf.count())
val covarianceMatrixSqrt = calculateCovarianceMatrixSqrt(standardScalerDf, scaledFeatureVecCol)
(standardScalerDf, covarianceMatrixSqrt)
}
def calculateStandardScalar(
featuresDf: DataFrame,
featureColumnNames: Array[String],
outputColumn: String): DataFrame = {
val vectorColumnName = "featureVectors"
val assembler = new VectorAssembler()
.setInputCols(featureColumnNames)
.setOutputCol(vectorColumnName)
val assembledData = assembler.transform(featuresDf)
// calculate standard scalar
val scaler = new StandardScaler()
.setInputCol(vectorColumnName)
.setOutputCol(outputColumn)
.setWithMean(true)
.setWithStd(true)
val scalerModel = scaler.fit(assembledData)
scalerModel.transform(assembledData)
}
def calculateCovarianceMatrixSqrt(featuresDf: DataFrame, featureVecColumn: String): List[Array[Double]] = {
// Extract standardized features
val scaledFeatures = featuresDf.select(featureVecColumn).rdd.map(row => row.getAs[Vector](0).toArray).collect()
// Convert scaled features to Breeze DenseMatrix
val breezeScaledFeatures =
new DenseMatrix(rows = scaledFeatures.length, cols = scaledFeatures.head.size, data = scaledFeatures.flatten)
// Calculate covariance matrix
println("Calculate covariance matrix")
val covMatrixDense: DenseMatrix[Double] = cov(breezeScaledFeatures.t)
val svd.SVD(u, s, vt) = svd(covMatrixDense)
// Calculate the square root of the singular values
val sqrtSingularValues = diag(s.map(value => math.sqrt(math.abs(value))))
// Reconstruct the square root matrix using U, S, and Vt
val sqrtMatrix: DenseMatrix[Double] = u * sqrtSingularValues * vt
// Convert Breeze DenseMatrix back to Spark Matrix
val sqrtCovMatrix: Matrix = Matrices.dense(sqrtMatrix.rows, sqrtMatrix.cols, sqrtMatrix.toArray)
// convert to list of array for easy IO
sqrtCovMatrix.transpose // Transpose since .toArray is column major
.toArray
.grouped(sqrtCovMatrix.numCols)
.toList
}
I'm getting a weird error while trying to convert the 2D matrix to breeze DenseMatrix.
ERROR Client: Application diagnostics message: User class threw exception: java.lang.IndexOutOfBoundsException: Storage array has size 694391433 but indices can grow as large as 761101526
I have verified that the 2D array has arround 300k rows and 251 columns. Can someone please help me figure out what I'm doing wrong?