How to use the RangePartitioner in Spark

4.7k views Asked by At

I want to use a RangePartitioner in my Java Spark Application, but I have no clue how to set the two scala parameters scala.math.Ordering<K> evidence$1 and scala.reflect.ClassTag<K> evidence$2. Can someone give me an example?

Here is the link to the JavaDoc of RangePartitioner (it was no help for me because I'm new to Spark and Scala...):

My Code actually looks like:

JavaPairRDD<Integer, String> partitionedRDD = rdd.partitionBy(new RangePartitioner<Integer, String>(10, rdd, true, evidence$1, evidence$2));
3

There are 3 answers

0
dnocode On

Here you can find an example of how to use RangePartitioner from java.

https://github.com/PacktPublishing/Apache-Spark-2x-for-Java-Developers/blob/master/src/main/java/com/packt/sfjd/ch7/Partitioning.java.

If you wants to implements a custom Comparator for a custom object:

  • implements your custom comparator in java
  • then pass to Range Partitioner Applying

    Ordering$.MODULE$.comparatorToOrdering(new MyCustomComparator()) to convert the Java comparator to scala ordering

0
whaleberg On

You can create both the Ordering and the ClassTag by calling methods on their companion objects.

These are referred to in java like this: ClassName$.MODULE$.functionName()

One further wrinkle is that the constructor requires a scala RDD, not a java one. You can get the scala RDD from a java PairRDD by calling rdd.rdd()

    final Ordering<Integer> ordering = Ordering$.MODULE$.comparatorToOrdering(Comparator.<Integer>naturalOrder());
    final ClassTag<Integer> classTag = ClassTag$.MODULE$.apply(Integer.class);
    final RangePartitioner<Integer, String> partitioner = new RangePartitioner<>(
            10, 
            rdd.rdd(),   //note the call to rdd.rdd() here, this gets the scala RDD backing the java one
            true,
            ordering,
            classTag);
    final JavaPairRDD<Integer, String> partitioned = rdd.partitionBy(partitioner);
1
nir On

If you look at api for OrderedRDDFunctions there is an example on how you can set implicit ordering for your key.

import org.apache.spark.SparkContext._

val rdd: RDD[(String, Int)] = ...
implicit val caseInsensitiveOrdering = new Ordering[String] {
  override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
}

I know its a snippet from spark-scala apis, but you can at least infer how to pass your Ordering parameter. For ClassTag type I'd suggest check for generic scala doc or forum. Adding scala tag to question.