Extend HBase Put to avoid original Row Check in add method

227 views Asked by At

HBase Need to export data from one cluster and import it to another with slight modification in row key

As I have referred in above post, need to export the HBase data of table from one cluster and import it into the another cluster by changing row key based on our match pattern

In the "org.apache.hadoop.hbase.mapreduce.Import" we have option to change the ColumnFamily using the args "HBASE_IMPORTER_RENAME_CFS"

I have slightly modified the Import code to support row key change.My code is available in Pastebin https://pastebin.com/ticgeBb0

Changed the row key using the below code.

private static Cell convertRowKv(Cell kv, Map<byte[], byte[]> rowkeyReplaceMap) {
        if (rowkeyReplaceMap != null) {
            byte[] oldrowkeyName = CellUtil.cloneRow(kv);
            String oldrowkey = Bytes.toString(oldrowkeyName);
            Set<byte[]> keys = rowkeyReplaceMap.keySet();
            for (byte[] key : keys) {
                if (oldrowkey.contains(Bytes.toString(key))) {
                    byte[] newrowkeyName = rowkeyReplaceMap.get(key);
                    ByteBuffer buffer = ByteBuffer.wrap(oldrowkeyName);
                    buffer.get(key);
                    ByteBuffer newbuffer = buffer.slice();
                    ByteBuffer bb = ByteBuffer.allocate(newrowkeyName.length + newbuffer.capacity());
                    byte[] newrowkey = bb.array();
                    kv = new KeyValue(newrowkey, // row buffer
                            0, // row offset
                            newrowkey.length, // row length
                            kv.getFamilyArray(), // CF buffer
                            kv.getFamilyOffset(), // CF offset
                            kv.getFamilyLength(), // CF length
                            kv.getQualifierArray(), // qualifier buffer
                            kv.getQualifierOffset(), // qualifier offset
                            kv.getQualifierLength(), // qualifier length
                            kv.getTimestamp(), // timestamp
                            KeyValue.Type.codeToType(kv.getTypeByte()), // KV
                                                                        // Type
                            kv.getValueArray(), // value buffer
                            kv.getValueOffset(), // value offset
                            kv.getValueLength()); // value length
                }
            }
        }
        return kv;
    }

Executed the Import

hbase org.apache.hadoop.hbase.mapreduce.ImportWithRowKeyChange -DHBASE_IMPORTER_RENAME_ROW=123:123456 import file:///home/nshsh/export/

The row key has been successfully changed. But while put the Cell in the HBase table, using "org.apache.hadoop.hbase.client.Put.add(Cell)" we have check as

"the row of the kv is the same as the put as we are changing row key"

Here it fails.

Then I have commented the check in Put class and updated the hbase-client.jar. Also I have tried to write HBasePut which extends Put

public class HBasePut extends Put {

    public HBasePut(byte[] row) {
        super(row);
        // TODO Auto-generated constructor stub
    }
    
    
    public Put add(Cell kv) throws IOException{
        byte [] family = CellUtil.cloneFamily(kv);
       System.err.print(Bytes.toString(family));
        List<Cell> list = getCellList(family);
        //Checking that the row of the kv is the same as the put
        /*int res = Bytes.compareTo(this.row, 0, row.length,
            kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
        if (res != 0) {
          throw new WrongRowIOException("The row in " + kv.toString() +
            " doesn't match the original one " +  Bytes.toStringBinary(this.row));
        }*/
        list.add(kv);
        familyMap.put(family, list);
        return this;
      }

}

In the Mapreduce, the task always fails with the below exception

2020-07-24 13:37:15,105 WARN  [htable-pool1-t1] hbase.HBaseConfiguration: Config option "hbase.regionserver.lease.period" is deprecated. Instead, use "hbase.client.scanner.timeout.period"
2020-07-24 13:37:15,122 INFO  [LocalJobRunner Map Task Executor #0] client.AsyncProcess: , tableName=import
2020-07-24 13:37:15,178 INFO  [htable-pool1-t1] client.AsyncProcess: #2, table=import, attempt=18/35 failed=7ops, last exception: org.apache.hadoop.hbase.client.WrongRowIOException: org.apache.hadoop.hbase.client.WrongRowIOException: The row in \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00/vfrt:con/1589541180643/Put/vlen=225448/seqid=0 doesn't match the original one 123_abcf
    at org.apache.hadoop.hbase.client.Put.add(Put.java:330)
    at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toPut(ProtobufUtil.java:574)
    at org.apache.hadoop.hbase.regionserver.RSRpcServices.doBatchOp(RSRpcServices.java:744)
    at org.apache.hadoop.hbase.regionserver.RSRpcServices.doNonAtomicRegionMutation(RSRpcServices.java:720)
    at org.apache.hadoop.hbase.regionserver.RSRpcServices.multi(RSRpcServices.java:2168)
    at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:33656)
    at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2196)
    at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
    at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:133)
    at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:108)
    at java.lang.Thread.run(Thread.java:745)

I don't know where the old Put Class has been referred in the task.

Can someone please help to fix this.

0

There are 0 answers