Find size of data stored in rdd from a text file in apache spark

7.8k views Asked by At

I am new to Apache Spark (version 1.4.1). I wrote a small code to read a text file and stored its data in Rdd .

Is there a way by which I can get the size of data in rdd .

This is my code :

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.SizeEstimator
import org.apache.spark.sql.Row

object RddSize {

  def main(args: Array[String]) {

    val sc = new SparkContext("local", "data size")
    val FILE_LOCATION = "src/main/resources/employees.csv"
    val peopleRdd = sc.textFile(FILE_LOCATION)

    val newRdd = peopleRdd.filter(str => str.contains(",M,"))
    //Here I want to find whats the size remaining data
  }
} 

I want to get size of data before filter Transformation (peopleRdd) and after it (newRdd).

3

There are 3 answers

3
Gabber On BEST ANSWER

There are multiple way to get the RDD size

1.Add the spark listener in your spark context

SparkDriver.getContext.addSparkListener(new SparkListener() {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
  val map = stageCompleted.stageInfo.rddInfos
  map.foreach(row => {
      println("rdd memSize " + row.memSize)
      println("rdd diskSize " + row.diskSize)
   })
}})

2. Save you rdd as text file.

myRDD.saveAsTextFile("person.txt")

and call Apache Spark REST API.

/applications/[app-id]/stages

3. You can also try SizeEstimater

val rddSize = SizeEstimator.estimate(myRDD)
2
Patrick McGloin On

I'm not sure you need to do this. You could cache the rdd and check the size in the Spark UI. But lets say that you do want to do this programmatically, here is a solution.

    def calcRDDSize(rdd: RDD[String]): Long = {
        //map to the size of each string, UTF-8 is the default
        rdd.map(_.getBytes("UTF-8").length.toLong) 
           .reduce(_+_) //add the sizes together
    }

You can then call this function for your two RDDs:

println(s"peopleRdd is [${calcRDDSize(peopleRdd)}] bytes in size")
println(s"newRdd is [${calcRDDSize(newRdd)}] bytes in size")

This solution should work even if the file size is larger than the memory available in the cluster.

0
Little Bobby Tables On

The Spark API doc says that:

  1. You can get info about your RDDs from the Spark context: sc.getRDDStorageInfo
  2. The RDD info includes memory and disk size: RDDInfo doc