I have created a JavaPairRDD from two different datasets- first one is the output file from METIS graph partitioning algorithm, and second is the input graph for the METIS graph partitioner. The key value pair of the JavaPairRDD is constructed as:
JavaPairRDD<Integer, Map<Integer, List<Integer>>> metisGraphWithPartitionIndexRDD =javaSparkContext.parallelizePairs(mapMetisGraphWithPartitionIndex);
and the RDD after printing looks like:
0 {1=[5, 3, 2]}
0 {2=[1, 3, 4]}
0 {3=[5, 4, 2, 1]}
1 {4=[2, 3, 6, 7]}
0 {5=[1, 3, 6]}
1 {6=[5, 4, 7]}
1 {7=[6, 4]}
The structure of JavaPairRDD holds three different elements. <Key1, Map<Key2,List>, where key1 represents the partition index, and have 0 and 1, if number of partition is 2 and so on. Whereas, the key2 represents vertexId, and List holds the adjacent vertices of the vertexId. The structure is given as:
PartitionIndex {vertex_id = [adjacent vertex list]
And I want to partition this JavaPairRDD on the basis of the key. However, my custom partitioning is not working. Could someone look at it?
JavaPairRDD<Integer, Map<Integer, List<Integer>>> customPartitioned = metisGraphWithPartitionIndexRDD.partitionBy(new CustomPartitioner(2));
JavaRDD<Object> customPartitionedIndex = customPartitioned.mapPartitionsWithIndex((index, tupleIterator) -> {
List<Object> list = new ArrayList<>();
while (tupleIterator.hasNext()) {
list.add("Partition number: " + index + " ,key: " + tupleIterator.next()._1());}
return list.iterator();
}, true);
And this is my CustomPartitioner class
public class CustomPartitioner extends Partitioner {
private final int numParts;
public CustomPartitioner(int i) {
numParts = i;
}
@Override
public int getPartition(Object key) {
int partIndex = ((Integer) key);
return partIndex;
}
@Override
public int numPartitions() {
// TODO Auto-generated method stub
return numParts;
}
}
It seems like the custom partitioning method works just fine. I just had to fix few here and there to validate the partitioner was running. In the given code I just modified this verification code.
This simply prints the result where the JavaPairRDD has been partitioned according to the key. So the tuples from the JavaPairRDD are partitoned into 2 partitions with partiton value as 0 and 1.
[0, 0, 0, 0, 1, 1, 1]