i'm newbie to Scala Spark programming. I have to build a Recommendation System for movies in Scala Spark with the usage of Google Cloud Platform. The dataset is composed by (movie_id, user_id, rating) and it is about 100000000 rows. The following code runs only on the master node, the cpu usage of the two workers nodes keep stable around 2%.
1 node master with 2 cores and 16gb RAM 2 nodes worker with 5 cores and 32gb RAM each one
import com.google.cloud.storage.{BlobId, StorageOptions}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.functions._
import org.apache.spark.SparkConf
object CollaborativeFilteringDF {
def predict(targetUser: String, testUserMovies: DataFrame, similarity: DataFrame, ratingsDF: DataFrame): DataFrame = {
val similarUsers = similarity
.filter(col("user_id1") === targetUser)
.orderBy(col("cosine_similarity").desc)
.limit(10)
.select("user_id1", "user_id2")
// Trova i film votati dagli utenti simili che si trovano in testUserMovies
val recommendations = similarUsers
.join(ratingsDF, similarUsers("user_id2") === ratingsDF("user_id"), "inner")
.join(testUserMovies, Seq("movie_id"), "inner")
.groupBy("movie_id", "user_id1")
.agg(avg(col("rating").cast("Double")).alias("predicted_rating"))
return recommendations
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
var bucketName = "bucket"
var trainName = "train.csv"
var testName = "test.csv"
if (args.length > 0) {
bucketName = args(0)
if (args.length > 2) {
trainName = args(1)
testName = args(2)
}
}
val basePath = s"gs://$bucketName"
val trainPath = s"$basePath/$trainName"
val testPath = s"$basePath/$testName"
val conf = new SparkConf()
.setAppName("CollaborativeFilteringDF")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
.set("spark.hadoop.google.cloud.auth.service.account.enable", "true")
// Inizializza una sessione Spark
val spark = SparkSession.builder.config(conf).getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
println(s"Read train: $trainName")
// Leggi il dataset dei voti dei film
val ratings = spark.read
.option("header", true)
.csv(trainPath)
.repartition(4)
println(s"Read test: $testName")
val test = spark.read
.option("header", true)
.csv(testPath)
val t0 = System.nanoTime()
// Converti le colonne di rating in tipo Double
val ratingsDF = ratings
.select(col("movie_id"), col("user_id"), col("rating").cast("Double"))
var numPartitions = ratings.rdd.getNumPartitions
println(s"Numero di partizioni di ratingsDF: $numPartitions")
println("Compute user product...")
// Calcola il prodotto tra le differenze dei voti di due utenti
val userProduct = ratingsDF
.alias("u1")
.join(ratingsDF.alias("u2"), Seq("movie_id"))
.where(col("u1.user_id") =!= col("u2.user_id"))
.select(col("u1.user_id").alias("user_id1"), col("u2.user_id").alias("user_id2"), col("u1.rating").alias("rating1"), col("u2.rating").alias("rating2"))
println("Compute Similarity...")
// Calcola la similarità tra utenti utilizzando la cosine similarity
val similarity = userProduct
.groupBy("user_id1", "user_id2")
.agg(
(sum(col("rating1") * col("rating2")) / (sqrt(sum(col("rating1") * col("rating1"))) * sqrt(sum(col("rating2") * col("rating2"))))).alias("cosine_similarity")
)
.repartition(4)
numPartitions = similarity.rdd.getNumPartitions
println(s"Numero di partizioni di similarity: $numPartitions")
println("Compute recommendations...")
val allUserRecommendations = test
.select("user_id")
.distinct()
.as[String]
.collect()
.map(user_id => predict(user_id, test.select("movie_id").where(col("user_id") === user_id), similarity, ratingsDF))
.reduce((df1, df2) => df1.union(df2))
.repartition(4)
println("Add true value column...")
val recommendationToEvaluate = allUserRecommendations.alias("uRec")
.join(test.alias("test"), col("uRec.movie_id") === col("test.movie_id") and (col("uRec.user_id1") === col("test.user_id")), "inner")
.select("user_id", "test.movie_id", "predicted_rating", "rating")
.withColumn("rating", col("rating").cast("Double"))
// Valuta il modello
println("Evaluate...")
val errors = recommendationToEvaluate
.select("predicted_rating", "rating")
.rdd
.map { row =>
val predictedRating = row.getAs[Double]("predicted_rating") // Cast esplicito a Double
val actualRating = row.getAs[Double]("rating") // Cast esplicito a Double
val squaredError = math.pow(predictedRating - actualRating, 2)
val absoluteError = math.abs(predictedRating - actualRating)
(squaredError, absoluteError)
}
// Verifica che ci siano dati prima di calcolare la media
if (!errors.isEmpty()) {
// Calcola la media degli errori su tutte le partizioni
val meanSquaredError = errors.map(_._1).mean()
val meanAbsoluteError = errors.map(_._2).mean()
// Calcola la radice dell'errore quadratico medio (RMSE)
val rmse = math.sqrt(meanSquaredError)
println(s"RMSE: $rmse")
println(s"MAE: $meanAbsoluteError")
} else {
println("Nessun dato disponibile per il calcolo degli errori.")
}
val t1 = System.nanoTime()
val time = (t1 - t0) / 1000000000
println(s"Elapsed time:\t" + time + "sec (" + (t1 - t0) + "ns)")
// Chiudi la sessione Spark
spark.close()
}
}