HBase Get/Scan in a Scalding job

144 views Asked by At

I'm using Scalding with Spyglass to read from/write to HBase.

I'm doing a left outer join of table1 and table2 and write back to table1 after transforming a column. Both table1 and table2 are declared as Spyglass HBaseSource.

This works fine. But, i need to access a different row in table1 using rowkey to compute transformed value.

I tried the following for HBase get: val hTable = new HTable(conf, TABLE_NAME) val result = hTable.get(new Get(rowKey.getBytes()))

I'm getting access to Configuration in Scalding job as mentioned in this link:

https://github.com/twitter/scalding/wiki/Frequently-asked-questions#how-do-i-access-the-jobconf

This works when i run the scalding job locally. But, when i run it in cluster, conf is null when this code is executed in Reducer.

Is there a better way to do HBase get/scan in a Scalding/Cascading job for cases like this?

1

There are 1 answers

0
adev On

Ways to do this...

1) You can use a managed resource

class SomeJob(args: Args) extends Job(args) {      
  val someConfig = HBaseConfiguration.create().addResource(new Path(pathtoyourxmlfile))
  lazy val hPool = new HTablePool(someConfig, 3)

  def getConf = {
    implicitly[Mode] match {
      case Hdfs(_, conf) => conf
      case _ => whateveryou are doing for a local conf...
    }
  }
  ... somePipe.someOperation.... {
        val gets = key.map { key => new Get(key) }
        managed(hPool.getTable("myTableName")) acquireAndGet { table => 
          val results = table.get(gets)
          ...do something with these results
        }
     }    
}

2) You can use some more specific cascading code, where you write a custom scheme and inside that you will override the source method and possibly some others depending on your needs. In there you can access the JobConf like this:

class MyScheme extends Scheme[JobConf, SomeRecordReader, SomeOutputCollector, ..] {

  @transient var jobConf: Configuration = super.jobConfiguration

  override def source(flowProcess: FlowProcess[JobConf], ...): Boolean = {
   jobConf = flowProcess match {
     case h: HadoopFlowProcess => h.getJobConf
     case _ => jconf
   }

   ... dosomething with the jobConf here

 }   

}