There is a table with two columns books
and readers
of these books, where books
and readers
are book and reader IDs, respectively.
I need to remove from this table readers who read more then 10 books.
First I group books by reader and get sizes of these groups:
val byReader = data.map(r => (r.reader,r.book))
val booksByReader = byReader.groupByKey()
val booksByReaderCnts = booksByReader.map(tuple => tuple match {
case (reader, bookIter) => (reader, bookIter.size)
})
I run this on a single node and try to cheat distributed nature of computations and store reader id's with big book counts in Scala hash map, as a side effect. I also filter out readers with big book counts in a 'standard Spark way':
val maxBookCnt = 10
var hugeBookCntsMap: Map[Int, Int] = Map() // map to store reader id's with huge book counts
//Get readers with huge book counts
val hugeBookCnts = booksByReaderCnts.filter(tuple => tuple match {
case (reader: Int, cnt: Int) => hugeBookCntsMap += (reader -> cnt); cnt > maxBookCnt
})
Spark filtering works as expected and creates RDD with pairs of readers with big book counts:
println("*** Huge cnts has: "+hugeBookCnts.count() + " elements")
hugeBookCnts.take(100).foreach(println)
On the other hand map remains empty:
println("*** Map:")
hugeBookCntsMap.map(tuple => tuple match {
case (reader: Int, cnt: Int) => println("Reader: " + reader + " Cnt: " + cnt)
})
Questions:
My idea was to create a Scala hash map to store id's of users with big book counts. Next I wanted to filter original data by checking if user is in the hash. Those in the hash should be filtered out. Obviously, local hash map is not getting data, and can't be used for this purpose. * Main question: How to filter out records of readers with big reading list? *
Why local hash map remains empty, providing that everything runs on a single node?
Does Spark provide any mechanism to organize hash maps shared between different processes?
There is a way to do it with Spark without the need to bring counts to single machine:
Answering your other questions:
Your code that updates hugeBookCntsMap is executed on the workers in different JVM process. Spark doesn't bring values from workers back to driver
Spark provides several mechanisms for sending values from driver to workers and back from workers to driver.
In general if output of your application is too big to fit into single node memory - save is to HDFS or S3 or some other distributed storage.