Why split points are out of order on Hadoop total order partitioner?

796 views Asked by At

I use Hadoop total order partitioner and random sampler as input sampler.

But when I increase my slave nodes and reduce tasks to 8, I get following error:

Caused by: java.io.IOException: Split points are out of order

I don't know the reason for this error.

How can I set the number of three parameters on inputsampler.randomsampler function?

2

There are 2 answers

1
work.asr On

Are you sure you are generating enough keys? From the javadoc: TotalOrderPartitioner

The input file must be sorted with the same comparator and contain

JobContextImpl.getNumReduceTasks() - 1 keys.
0
Eponymous On

Two possible problems

  • You have duplicate keys
  • You are using a different comparator for the input sampler and the task on which you are running the total order partitioner

You can diagnose this by downloading the partition file and examining its contents. The partitions file is the value of total.order.partitioner.path if it is set or _partition.lst otherwise. If your keys are text, you can run hdfs dfs -text path_to_partition_file | less to get a look. This may also work for other key types, but I haven't tried it.

If there are duplicate lines in the partition file, you have duplicate keys, otherwise you're probably using the wrong comparator.

How to fix

Duplicate Keys

My best guess is that your keys are so unbalanced that an even division of records among partitions is generating partitions with identical split points.

To solve this you have several options:

  • Choose a value to use as a key that better distinguishes your inputs (probably not possible, but much better if you can)
  • Use fewer partitions and reducers (not as scalable or certain as the next solution, but simpler to implement, especially if you have only a few duplicates). Divide the original number of partitions by largest number of duplicate entries. (For example, if your partition key file lists: a, a, b, c, c, c, d, e as split points then you have 9 reducers (8 split points) and max duplicates of 3. So, use 3 reducers (3=floor(9/3)) and if your sampling is good, you'll probably end up with proper split points. For complete stability you'd need to be able to re-run the partition step if it has duplicate entries so you can guard against the occasional over-sampling of the unbalanced keys, but at that level of complexity, you may as well look into the next solution.
  • Read the partitions file, rewrite it without duplicate entries, count the number of entries (call it num_non_duplicates) and use num_non_duplicates+1 reducers. The reducers with the duplicated keys will have much more work than the other reducers and run longer. If the reduce operation is commutative and associative, you may be able to mitigate this by using combiners.

Using the wrong comparator

Make sure you have mapred.output.key.comparator.class set identically in both the call to writePartitionFile and the job using TotalOrderPartitioner

Extra stuff you don't need to read but might enjoy:

The Split points are out of order error message comes from the code:

  RawComparator<K> comparator =
    (RawComparator<K>) job.getOutputKeyComparator();
  for (int i = 0; i < splitPoints.length - 1; ++i) {
    if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
      throw new IOException("Split points are out of order");
    }
  }

The line comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0 means that a pair of split points is rejected if they are either identical or out-of-order.

1 or 2 reducers will never generate this error since there can't be more than 1 split point and the loop will never execute.