Performance benefits of DataSet over RDD

722 views Asked by At

After reading few great articles (this, this and this) about Spark's DataSets, I finishing with next DataSet's performance benefits over RDD:

  1. Logical and physical plan optimization;
  2. Strict typization;
  3. Vectorized operations;
  4. Low level memory management.

Questions:

  1. Spark's RDD also builds physical plan and can combine/optimize multiple transformations at the same stage. Then what is the benefit of DataSet over RDD?
  2. From the first link you can see an example of RDD[Person]. Does DataSet have advanced typization?
  3. What do they mean by "vectorized operations"?
  4. As I understand, DataSet's low memory management = advanced serialization. That means off-heap storage of serializable objects, where you can read only one field of an object without deserialization. But how about the situation when you have IN_MEMORY_ONLY persistence strategy? Will DataSet serialize everything any case? Will it have any performance benefit over RDD?
1

There are 1 answers

0
zero323 On BEST ANSWER

Spark's RDD also builds physical plan and can combine/optimize multiple transformations at the same stage. Than what is the benefit of DataSet over RDD?

When working with RDD what you write is what you get. While certain transformations are optimized by chaining, the execution plan is direct translation of the DAG. For example:

rdd.mapPartitions(f).mapPartitions(g).mapPartitions(h).shuffle()

where shuffle is an arbitrary shuffling transformation (*byKey, repartition, etc.) all three mapPartitions (map, flatMap, filter) will be chained without creating intermediate objects but cannot be rearranged.

Compared to that Datasets use significantly more restrictive programming model but can optimize execution using a number of techniques including:

  • Selection (filter) pushdown. For example if you have:

    df.withColumn("foo", col("bar") + 1).where(col("bar").isNotNull())
    

    can be executed as:

    df.where(col("bar").isNotNull()).withColumn("foo", col("bar") + 1)
    
  • Early projections (select) and eliminations. For example:

    df.withColumn("foo", col("bar") + 1).select("foo", "bar")
    

    can be rewritten as:

    df.select("foo", "bar").withColumn("foo", col("bar") + 1)
    

    to avoid fetching and passing obsolete data. In the extreme case it can eliminate particular transformation completely:

    df.withColumn("foo", col("bar") + 1).select("bar")
    

    can be optimized to

    df.select("bar")
    

These optimizations are possible for two reasons:

  • Restrictive data model which enables dependency analysis without complex and unreliable static code analysis.
  • Clear operator semantics. Operators are side effects free and we clearly distinguish between deterministic and nondeterministic ones.

To make it clear let's say we have a following data model:

case class Person(name: String, surname: String, age: Int)

val people: RDD[Person] = ???

And we want to retrieve surnames of all people older than 21. With RDD it can be expressed as:

people
  .map(p => (p.surname, p.age))          // f
  .filter { case (_, age) => age > 21 }  // g

Now let's ask ourselves a few questions:

  • What is the relationship between the input age in f and age variable with g?
  • Is f and then g the same as g and then f?
  • Are f and g side effects free?

While the answer is obvious for a human reader it is not for a hypothetical optimizer. Compared to that with Dataframe version:

people.toDF
  .select(col("surname"), col("age"))    // f'
  .where(col("age") > 21)                // g'

the answers are clear for both optimizer and human reader.

This has some further consequences when using statically typed Datasets (Spark 2.0 Dataset vs DataFrame).

Have DataSet got more advanced typization?

  • No - if you care about optimizations. The most advanced optimizations are limited to Dataset[Row] and at this moment it is not possible to encode complex type hierarchy.
  • Maybe - if you accept overhead of the Kryo or Java encoders.

What does they mean by "vectorized operations"?

In context of optimization we usually mean loop vectorization / loop unrolling. Spark SQL uses code generation to create compiler friendly version of the high level transformations which can be further optimized to take advantage of the vectorized instruction sets.

As I understand, DataSet's low memory management = advanced serialization.

Not exactly. The biggest advantage of using native allocation is escaping garbage collector loop. Since garbage collections is quite often a limiting factor in Spark this is a huge improvement, especially in contexts which require large data structures (like preparing shuffles).

Another important aspect is columnar storage which enables effective compression (potentially lower memory footprint) and optimized operations on compressed data.

In general you can apply exactly the same types of optimizations using hand crafted code on plain RDDs. After all Datasets are backed by RDDs. The difference is only how much effort it takes.

  • Hand crafted execution plan optimizations are relatively simple to achieve.
  • Making code compiler friendly requires some deeper knowledge and is error prone and verbose.
  • Using sun.misc.Unsafe with native memory allocation is not for the faint-hearted.

Despite all its merits Dataset API is not universal. While certain types of common tasks can benefit from its optimizations in many contexts you may so no improvement whatsoever or even performance degradation compared to RDD equivalent.