OutOfBoundsException with ALS - Flink MLlib

465 views Asked by At

I'm doing a recommandation system for movies, using the MovieLens datasets available here : http://grouplens.org/datasets/movielens/

To compute this recommandation system, I use the ML library of Flink in scala, and particulalrly the ALS algorithm (org.apache.flink.ml.recommendation.ALS).

I first map the ratings of the movie into a DataSet[(Int, Int, Double)] and then create a trainingSet and a testSet (see the code below).

My problem is that there is no bug when I'm using the ALS.fit function with the whole dataset (all the ratings), but if I just remove only one rating, the fit function doesn't work anymore, and I don't understand why.

Do You have any ideas? :)

Code used :

Rating.scala

case class Rating(userId: Int, movieId: Int, rating: Double)

PreProcessing.scala

object PreProcessing {

def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
      env.readCsvFile[(Int, Int, Double)](
      ratingsPath, ignoreFirstLine = true,
      includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}

Processing.scala

object Processing {
  private val ratingsPath: String = "Path_to_ratings.csv"

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val ratings: DataSet[Rating] = PreProcessing.getRatings(env, ratingsPath)

    val trainingSet : DataSet[(Int, Int, Double)] =
    ratings
     .map(r => (r.userId, r.movieId, r.rating))
     .sortPartition(0, Order.ASCENDING)
     .first(ratings.count().toInt)

    val als = ALS()
     .setIterations(10)
     .setNumFactors(10)
     .setBlocks(150)
     .setTemporaryPath("/tmp/tmpALS")

    val parameters = ParameterMap()
     .add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
     .add(ALS.Seed, 42L)

    als.fit(trainingSet, parameters)
  }
}

"But if I just remove only one rating"

val trainingSet : DataSet[(Int, Int, Double)] =
  ratings
    .map(r => (r.userId, r.movieId, r.rating))
    .sortPartition(0, Order.ASCENDING)
    .first((ratings.count()-1).toInt)

The error :

06/19/2015 15:00:24 CoGroup (CoGroup at org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:570))(4/4) switched to FAILED

java.lang.ArrayIndexOutOfBoundsException: 5

at org.apache.flink.ml.recommendation.ALS$BlockRating.apply(ALS.scala:358)

at org.apache.flink.ml.recommendation.ALS$$anon$111.coGroup(ALS.scala:635)

at org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)

...

1

There are 1 answers

2
Till Rohrmann On BEST ANSWER

The problem is the first operator in combination with the setTemporaryPath parameter of Flink's ALS implementation. In order to understand the problem, let me quickly explain how the blocking ALS algorithm works.

The blocking implementation of alternating least squares first partitions the given ratings matrix user-wise and item-wise into blocks. For these blocks, routing information is calculated. This routing information says which user/item block receives which input from which item/user block, respectively. Afterwards, the ALS iteration is started.

Since Flink's underlying execution engine is a parallel streaming dataflow engine, it tries to execute as many parts of the dataflow as possible in a pipelined fashion. This requires to have all operators of the pipeline online at the same time. This has the advantage that Flink avoids to materialize intermediate results, which might be prohibitively large. The disadvantage is that the available memory has to be shared among all running operators. In the case of ALS where the size of the individual DataSet elements (e.g. user/item blocks) is rather large, this is not desired.

In order to solve this problem, not all operators of the implementation are executed at the same time if you have set a temporaryPath. The path defines where the intermediate results can be stored. Thus, if you've defined a temporary path, then ALS first calculates the routing information for the user blocks and writes them to disk, then it calculates the routing information for the item blocks and writes them to disk and last but not least it starts ALS iteration for which it reads the routing information from the temporary path.

The calculation of the routing information for the user and item blocks both depend on the given ratings data set. In your case when you calculate the user routing information, then it will first read the ratings data set and apply the first operator on it. The first operator returns n-arbitrary elements from the underlying data set. The problem right now is that Flink does not store the result of this first operation for the calculation of the item routing information. Instead, when you start the calculation of the item routing information, Flink will re-execute the dataflow starting from its sources. This means that it reads the ratings data set from disk and applies the first operator on it again. This will give you in many cases a different set of ratings compared to the result of the first first operation. Therefore, the generated routing information are inconsistent and ALS fails.

You can circumvent the problem by materializing the result of the first operator and use this result as the input for the ALS algorithm. The object FlinkMLTools contains a method persist which takes a DataSet, writes it to the given path and then returns a new DataSet which reads the just written DataSet. This allows you to break up the resulting dataflow graph.

val firstTrainingSet : DataSet[(Int, Int, Double)] =
  ratings
    .map(r => (r.userId, r.movieId, r.rating))
    .first((ratings.count()-1).toInt)

val trainingSet = FlinkMLTools.persist(firstTrainingSet, "/tmp/tmpALS/training")

val als = ALS()
  .setIterations(10)
  .setNumFactors(10)
  .setBlocks(150)
  .setTemporaryPath("/tmp/tmpALS/")

val parameters = ParameterMap()
  .add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
  .add(ALS.Seed, 42L)

als.fit(trainingSet, parameters)

Alternatively, you can try to leave the temporaryPath unset. Then all steps (routing information calculation and als iteration) are executed in a pipelined fashion. This means that both the user and item routing information calculation use the same input data set which results from the first operator.

The Flink community is currently working on keeping intermediate results of operators in the memory. This will allow to pin the result of the first operator so that it won't be calculated twice and, thus, not giving differing results due to its non-deterministic nature.