Spark: Filtering out aggregated data?

860 views Asked by At

There is a table with two columns books and readers of these books, where books and readers are book and reader IDs, respectively. I need to remove from this table readers who read more then 10 books.

First I group books by reader and get sizes of these groups:

  val byReader = data.map(r => (r.reader,r.book))
    val booksByReader = byReader.groupByKey()
    val booksByReaderCnts = booksByReader.map(tuple => tuple match {
      case (reader, bookIter) => (reader, bookIter.size)
    })

I run this on a single node and try to cheat distributed nature of computations and store reader id's with big book counts in Scala hash map, as a side effect. I also filter out readers with big book counts in a 'standard Spark way':

    val maxBookCnt = 10
    var hugeBookCntsMap: Map[Int, Int] = Map() // map to store reader id's with huge book counts 
    //Get readers with huge book counts
    val hugeBookCnts = booksByReaderCnts.filter(tuple => tuple match {
      case (reader: Int, cnt: Int) => hugeBookCntsMap += (reader -> cnt); cnt > maxBookCnt
    })

Spark filtering works as expected and creates RDD with pairs of readers with big book counts:

    println("*** Huge cnts has: "+hugeBookCnts.count() + " elements")
    hugeBookCnts.take(100).foreach(println)

On the other hand map remains empty:

    println("*** Map:")
    hugeBookCntsMap.map(tuple => tuple match {
      case (reader: Int, cnt: Int) => println("Reader: " + reader + " Cnt: " + cnt) 
    })

Questions:

  1. My idea was to create a Scala hash map to store id's of users with big book counts. Next I wanted to filter original data by checking if user is in the hash. Those in the hash should be filtered out. Obviously, local hash map is not getting data, and can't be used for this purpose. * Main question: How to filter out records of readers with big reading list? *

    1. Why local hash map remains empty, providing that everything runs on a single node?

    2. Does Spark provide any mechanism to organize hash maps shared between different processes?

1

There are 1 answers

5
vvladymyrov On BEST ANSWER

There is a way to do it with Spark without the need to bring counts to single machine:

//count books by reader using reduceByKey transformation (thus no action yet)
// and filter readers with books count > 10
val readersWithLotsOfBooksRDD = data.map(r => (r.reader, 1)).reduceByKey((x, y) => x + y).filter{ case (_, x) => x > 10 }
// produces PairRDD
val readersWithBooksRDD = data.map( r => (r.reader, r.book))
//result
readersWithBooksRDD.subtractByKey(readersWithLotsOfBooksRDD).collect

Answering your other questions:

  1. Your code that updates hugeBookCntsMap is executed on the workers in different JVM process. Spark doesn't bring values from workers back to driver

  2. Spark provides several mechanisms for sending values from driver to workers and back from workers to driver.

    • from driver to worker - closures - the value of hugeBookCntsMap was serialized and send to workers. But changes to hugeBookCntsMap on worker are not sent back to driver. This intended to be used for small objects and functions. Read only.
    • from driver to worker - broadcast variables - this intended to use for large objects. Read only.
    • from worker to driver - work collect action or with reduce operations - like in my counting code. Worker produce tuples and they are aggregated and than collected back to driver
    • from workers to driver - using accumulators. But again it should be possible to aggregate results

In general if output of your application is too big to fit into single node memory - save is to HDFS or S3 or some other distributed storage.