CustomPartiton a JavaPairRDD

120 views Asked by At

I have created a JavaPairRDD from two different datasets- first one is the output file from METIS graph partitioning algorithm, and second is the input graph for the METIS graph partitioner. The key value pair of the JavaPairRDD is constructed as:

JavaPairRDD<Integer, Map<Integer, List<Integer>>> metisGraphWithPartitionIndexRDD =javaSparkContext.parallelizePairs(mapMetisGraphWithPartitionIndex);

and the RDD after printing looks like:

0 {1=[5, 3, 2]}
0 {2=[1, 3, 4]}
0 {3=[5, 4, 2, 1]}
1 {4=[2, 3, 6, 7]}
0 {5=[1, 3, 6]}
1 {6=[5, 4, 7]}
1 {7=[6, 4]}

The structure of JavaPairRDD holds three different elements. <Key1, Map<Key2,List>, where key1 represents the partition index, and have 0 and 1, if number of partition is 2 and so on. Whereas, the key2 represents vertexId, and List holds the adjacent vertices of the vertexId. The structure is given as:

PartitionIndex {vertex_id = [adjacent vertex list]

And I want to partition this JavaPairRDD on the basis of the key. However, my custom partitioning is not working. Could someone look at it?

JavaPairRDD<Integer, Map<Integer, List<Integer>>> customPartitioned = metisGraphWithPartitionIndexRDD.partitionBy(new CustomPartitioner(2));

JavaRDD<Object> customPartitionedIndex = customPartitioned.mapPartitionsWithIndex((index, tupleIterator) -> {
List<Object> list = new ArrayList<>();
while (tupleIterator.hasNext()) {
list.add("Partition number: " + index + " ,key: " + tupleIterator.next()._1());}
    return list.iterator();
    }, true);

And this is my CustomPartitioner class

public class CustomPartitioner extends Partitioner {
    private final int numParts;

    public CustomPartitioner(int i) {
        numParts = i;
    }

    @Override
    public int getPartition(Object key) {
        int partIndex = ((Integer) key);
        return partIndex;
    }

    @Override
    public int numPartitions() {
        // TODO Auto-generated method stub
        return numParts;
    }

}
1

There are 1 answers

0
Aavash Bhandari On

It seems like the custom partitioning method works just fine. I just had to fix few here and there to validate the partitioner was running. In the given code I just modified this verification code.

JavaRDD<Integer> result = customPartitioned.mapPartitionsWithIndex((idx, i) -> {
                List<Integer> partitionCheckList = new ArrayList<>();
                while (i.hasNext()) {
                    partitionCheckList.add(i.next()._1);
                }
                return partitionCheckList.iterator();
            }, true);

            System.out.println(result.collect());

This simply prints the result where the JavaPairRDD has been partitioned according to the key. So the tuples from the JavaPairRDD are partitoned into 2 partitions with partiton value as 0 and 1.

[0, 0, 0, 0, 1, 1, 1]