hBase java api, error on bulkload Added a key not lexically larger than previous sort (with JavaPairRDD<ImmutableBytesWritable, KeyValue>)

31 views Asked by At

I use bulkLoadHFiles.bulkLoad. I have org.apache.spark.sql.Dataset, which contains two columns of string (keys and value). I will transform this text into JavaPairRDD<ImmutableBytesWritable, KeyValue>. If the Dataset is not pre-sorted, then I get the error "IOException: Added a key not lexically larger than previous ... "

JavaPairRDD<ImmutableBytesWritable, KeyValue> pairsToBulkLoad =
    inputDataset.toJavaRDD().mapToPair(row -> convertToKV (row, "cf", "column"));

BulkLoadHFiles bulkLoadHFiles = BulkLoadHFiles.create(jobConfiguration);
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
pairsToBulkLoad.saveAsNewAPIHadoopFile(output.toString(), ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, jobConfiguration);
bulkLoadHFiles.bulkLoad(TableName.valueOf(hbaseFullTableName), output);

public Tuple2<ImmutableBytesWritable, KeyValue> convertToKV (final Row row, final String columnFamily,final String column) {
        final String key = row.getString(0);
        final String value = row.getString(1);
        return new Tuple2<>(new ImmutableBytesWritable(Bytes.toBytes(key)),
                new KeyValue(Bytes.toBytes(key), Bytes.toBytes(columnFamily), 
                             Bytes.toBytes(column), Bytes.toBytes(value)));
}

If I submit a pre-sorted Dataset, then this code works stably. But in reality, in an industrial environment, an unordered Dataset may arrive here. I tried to insert: pairsToBulkLoad = pairsToBulkLoad.sortByKey(true);

JavaPairRDD<ImmutableBytesWritable, KeyValue> pairsToBulkLoad =
    inputDataset.toJavaRDD().mapToPair(row -> convertToKV (row, "cf", "column"));

pairsToBulkLoad = pairsToBulkLoad.sortByKey(true);

BulkLoadHFiles bulkLoadHFiles = BulkLoadHFiles.create(jobConfiguration);
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
pairsToBulkLoad.saveAsNewAPIHadoopFile(output.toString(), ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, jobConfiguration);
bulkLoadHFiles.bulkLoad(TableName.valueOf(hbaseFullTableName), output);

In this case, I get another error: Job aborted due to stage failure: task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable

I don't understand how to look for a solution.

  1. If I sort Dataset using Spark, does this guarantee the ordering of the JavaPairRDD<ImmutableBytesWritable, KeyValue>?
  2. Or the second solution is to sort the JavaPairRDD<ImmutableBytesWritable, KeyValue>, but why does it fall on sorting " object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable"?

(Java 8, Spark 3. HBase 2.4.2)

I will be grateful for any advice)

1

There are 1 answers

0
clipper1995 On

I managed to solve this problem myself. You can sort an JavaPairRDD<ImmutableBytesWritable, KeyValue> in order like this:

JavaPairRDD<ImmutableBytesWritable, KeyValue> pairsToBulkLoad =
    inputDataset
    .rdd()
    .toJavaRDD()
    .mapToPair(row -> new Tuple2<>(row.getString("0"),row.getString("1"))));
    .sortByKey(true)
    .mapToPair(pair -> convertToKV (row, "cf", "column", pair._1, pair._2));
    
private static Tuple2<ImmutableBytesWritable, KeyValue> convertToKVCol(
                final String columnFamily,
                final String column,
                final String key,
                final String value) {
return new Tuple2<>(new ImmutableBytesWritable(Bytes.toBytes(key)),
                    new KeyValue(Bytes.toBytes(key), Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)));
    
}