Scala Spark distributed run on Google Cloud Platform but worker not working

78 views Asked by At

i'm newbie to Scala Spark programming. I have to build a Recommendation System for movies in Scala Spark with the usage of Google Cloud Platform. The dataset is composed by (movie_id, user_id, rating) and it is about 100000000 rows. The following code runs only on the master node, the cpu usage of the two workers nodes keep stable around 2%.

1 node master with 2 cores and 16gb RAM 2 nodes worker with 5 cores and 32gb RAM each one

import com.google.cloud.storage.{BlobId, StorageOptions}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.functions._
import org.apache.spark.SparkConf

object CollaborativeFilteringDF {
  def predict(targetUser: String, testUserMovies: DataFrame, similarity: DataFrame, ratingsDF: DataFrame): DataFrame = {
    val similarUsers = similarity
      .filter(col("user_id1") === targetUser)
      .orderBy(col("cosine_similarity").desc)
      .limit(10)
      .select("user_id1", "user_id2")


    // Trova i film votati dagli utenti simili che si trovano in testUserMovies
    val recommendations = similarUsers
      .join(ratingsDF, similarUsers("user_id2") === ratingsDF("user_id"), "inner")
      .join(testUserMovies, Seq("movie_id"), "inner")
      .groupBy("movie_id", "user_id1")
      .agg(avg(col("rating").cast("Double")).alias("predicted_rating"))

    return recommendations
  }


  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)

    var bucketName = "bucket"
    var trainName = "train.csv"
    var testName = "test.csv"

    if (args.length > 0) {
      bucketName = args(0)
      if (args.length > 2) {
        trainName = args(1)
        testName = args(2)
      }
    }

    val basePath = s"gs://$bucketName"
    val trainPath = s"$basePath/$trainName"
    val testPath = s"$basePath/$testName"

    val conf = new SparkConf()
      .setAppName("CollaborativeFilteringDF")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
      .set("spark.hadoop.google.cloud.auth.service.account.enable", "true")

    // Inizializza una sessione Spark
    val spark = SparkSession.builder.config(conf).getOrCreate()

    import spark.implicits._

    val sc = spark.sparkContext


    println(s"Read train: $trainName")
    // Leggi il dataset dei voti dei film
    val ratings = spark.read
      .option("header", true)
      .csv(trainPath)
      .repartition(4)

    println(s"Read test: $testName")
    val test = spark.read
      .option("header", true)
      .csv(testPath)


    val t0 = System.nanoTime()

    // Converti le colonne di rating in tipo Double
    val ratingsDF = ratings
      .select(col("movie_id"), col("user_id"), col("rating").cast("Double"))

    var numPartitions = ratings.rdd.getNumPartitions
    println(s"Numero di partizioni di ratingsDF: $numPartitions")

    println("Compute user product...")
    // Calcola il prodotto tra le differenze dei voti di due utenti
    val userProduct = ratingsDF
      .alias("u1")
      .join(ratingsDF.alias("u2"), Seq("movie_id"))
      .where(col("u1.user_id") =!= col("u2.user_id"))
      .select(col("u1.user_id").alias("user_id1"), col("u2.user_id").alias("user_id2"), col("u1.rating").alias("rating1"), col("u2.rating").alias("rating2"))

    println("Compute Similarity...")
    // Calcola la similarità tra utenti utilizzando la cosine similarity
    val similarity = userProduct
      .groupBy("user_id1", "user_id2")
      .agg(
        (sum(col("rating1") * col("rating2")) / (sqrt(sum(col("rating1") * col("rating1"))) * sqrt(sum(col("rating2") * col("rating2"))))).alias("cosine_similarity")
      )
      .repartition(4)

    numPartitions = similarity.rdd.getNumPartitions
    println(s"Numero di partizioni di similarity: $numPartitions")

    println("Compute recommendations...")

    val allUserRecommendations = test
      .select("user_id")
      .distinct()
      .as[String]
      .collect()
      .map(user_id => predict(user_id, test.select("movie_id").where(col("user_id") === user_id), similarity, ratingsDF))
      .reduce((df1, df2) => df1.union(df2))
      .repartition(4)


    println("Add true value column...")
    val recommendationToEvaluate = allUserRecommendations.alias("uRec")
      .join(test.alias("test"), col("uRec.movie_id") === col("test.movie_id") and (col("uRec.user_id1") === col("test.user_id")), "inner")
      .select("user_id", "test.movie_id", "predicted_rating", "rating")
      .withColumn("rating", col("rating").cast("Double"))


    // Valuta il modello
    println("Evaluate...")
    val errors = recommendationToEvaluate
      .select("predicted_rating", "rating")
      .rdd
      .map { row =>
        val predictedRating = row.getAs[Double]("predicted_rating") // Cast esplicito a Double
        val actualRating = row.getAs[Double]("rating") // Cast esplicito a Double
        val squaredError = math.pow(predictedRating - actualRating, 2)
        val absoluteError = math.abs(predictedRating - actualRating)
        (squaredError, absoluteError)
      }


    // Verifica che ci siano dati prima di calcolare la media
    if (!errors.isEmpty()) {
      // Calcola la media degli errori su tutte le partizioni
      val meanSquaredError = errors.map(_._1).mean()
      val meanAbsoluteError = errors.map(_._2).mean()

      // Calcola la radice dell'errore quadratico medio (RMSE)
      val rmse = math.sqrt(meanSquaredError)

      println(s"RMSE: $rmse")
      println(s"MAE: $meanAbsoluteError")
    } else {
      println("Nessun dato disponibile per il calcolo degli errori.")
    }

    val t1 = System.nanoTime()
    val time = (t1 - t0) / 1000000000
    println(s"Elapsed time:\t" + time + "sec (" + (t1 - t0) + "ns)")


    // Chiudi la sessione Spark
    spark.close()
  }
}
0

There are 0 answers