Spark and HBase Snapshots

2.9k views Asked by At

Under the assumption that we could access data much faster if pulling directly from HDFS instead of using the HBase API, we're trying to build an RDD based on a Table Snapshot from HBase.

So, I have a snapshot called "dm_test_snap". I seem to be able to get most of the configuration stuff working, but my RDD is null (despite there being data in the Snapshot itself).

I'm having a hell of a time finding an example of anyone doing offline analysis of HBase snapshots with Spark, but I can't believe I'm alone in trying to get this working. Any help or suggestions are greatly appreciated.

Here is a snippet of my code:

object TestSnap  {
  def main(args: Array[String]) {
    val config = ConfigFactory.load()
    val hbaseRootDir =  config.getString("hbase.rootdir")
    val sparkConf = new SparkConf()
      .setAppName("testnsnap")
      .setMaster(config.getString("spark.app.master"))
      .setJars(SparkContext.jarOfObject(this))
      .set("spark.executor.memory", "2g")
      .set("spark.default.parallelism", "160")

    val sc = new SparkContext(sparkConf)

    println("Creating hbase configuration")
    val conf = HBaseConfiguration.create()

    conf.set("hbase.rootdir", hbaseRootDir)
    conf.set("hbase.zookeeper.quorum",  config.getString("hbase.zookeeper.quorum"))
    conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout"))
    conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap")

    val scan = new Scan
    val job = Job.getInstance(conf)

    TableSnapshotInputFormat.setInput(job, "dm_test_snap", 
        new Path("hdfs://nameservice1/tmp"))

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableSnapshotInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    hBaseRDD.count()

    System.exit(0)
  }

}

Update to include the solution The trick was, as @Holden mentioned below, the conf wasn't getting passed through. To remedy this, I was able to get it working by changing this the call to newAPIHadoopRDD to this:

val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

There was a second issue that was also highlighted by @victor's answer, that I was not passing in a scan. To fix that, I added this line and method:

conf.set(TableInputFormat.SCAN, convertScanToString(scan))

 def convertScanToString(scan : Scan) = {
      val proto = ProtobufUtil.toScan(scan);
      Base64.encodeBytes(proto.toByteArray());
    }

This also let me pull out this line from the conf.set commands:

conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap")

*NOTE: This was for HBase version 0.96.1.1 on CDH5.0

Final full code for easy reference:

object TestSnap  {
  def main(args: Array[String]) {
    val config = ConfigFactory.load()
    val hbaseRootDir =  config.getString("hbase.rootdir")
    val sparkConf = new SparkConf()
      .setAppName("testnsnap")
      .setMaster(config.getString("spark.app.master"))
      .setJars(SparkContext.jarOfObject(this))
      .set("spark.executor.memory", "2g")
      .set("spark.default.parallelism", "160")

    val sc = new SparkContext(sparkConf)

    println("Creating hbase configuration")
    val conf = HBaseConfiguration.create()

    conf.set("hbase.rootdir", hbaseRootDir)
    conf.set("hbase.zookeeper.quorum",  config.getString("hbase.zookeeper.quorum"))
    conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout"))
    val scan = new Scan
    conf.set(TableInputFormat.SCAN, convertScanToString(scan))

    val job = Job.getInstance(conf)

    TableSnapshotInputFormat.setInput(job, "dm_test_snap", 
        new Path("hdfs://nameservice1/tmp"))

    val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    hBaseRDD.count()

    System.exit(0)
  }

  def convertScanToString(scan : Scan) = {
      val proto = ProtobufUtil.toScan(scan);
      Base64.encodeBytes(proto.toByteArray());
  }

}
3

There are 3 answers

2
Holden On BEST ANSWER

Looking at the Job information, its making a copy of the conf object you are supplying to it (The Job makes a copy of the Configuration so that any necessary internal modifications do not reflect on the incoming parameter.) so most likely the information that you need to set on the conf object isn't getting passed down to Spark. You could instead use TableSnapshotInputFormatImpl which has a similar method that works on conf objects. There might be additional things needed but at first pass through the problem this seems like the most likely cause.

As pointed out in the comments, another option is to use job.getConfiguration to get the updated config from the job object.

1
Vladimir Rodionov On

You have not configured your M/R job properly: This is example in Java on how to configure M/R over snapshots:

Job job = new Job(conf);
Scan scan = new Scan();
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
       scan, MyTableMapper.class, MyMapKeyOutput.class,
       MyMapOutputValueWritable.class, job, true);
}

You, definitely, skipped Scan. I suggest you taking a look at TableMapReduceUtil's initTableSnapshotMapperJob implementation to get idea how to configure job in Spark/Scala.

0
Sudarshan kumar On

Here is complete configuration in mapreduce Java

TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, // Name of the snapshot
                scan, // Scan instance to control CF and attribute selection
                DefaultMapper.class, // mapper class
                NullWritable.class, // mapper output key
                Text.class, // mapper output value
                job,
                true,
                restoreDir);