What is the replacement for summing list in Scala-Scalding

818 views Asked by At

I have following code where I maintain a large List: What I do here is go over the data stream and create an inverted index. I use twitter scalding api and dataTypePipe is type of TypedPipe

  lazy val cats = dataTypePipe.cross(cmsCats)
  .map(vf => (vf._1.itemId, vf._1.leafCats, vf._2))
    .flatMap {
    case (id, categorySet, cHhitters) => categorySet.map(cat => (
    ...
  }
    .filter(f => f._2.nonEmpty)
    .group.withReducers(4000)
    .sum
    .map {
    case ((token,bucket), ids) =>
      toIndexedRecord(ids, token, bucket)
  }

Due to a serialization issue I convert scala list to java list and use avro to write:

  def toIndexedRecord(ids: List[Long], token: String, bucket: Int): IndexRecord = {
     val javaList = ids.map(l => l: java.lang.Long).asJava //need to convert from scala long to java long
     new IndexRecord(token, bucket,javaList)
  }

But the issue is large number of information keeping in list cause Java Heap issue. I believe summing is also a contributor to this issue

2013-08-25 16:41:09,709 WARN org.apache.hadoop.mapred.Child: Error running child
cascading.pipe.OperatorException: [_pipe_0*_pipe_1][com.twitter.scalding.GroupBuilder$$anonfun$1.apply(GroupBuilder.scala:189)] operator Every failed executing operation: MRMAggregator[decl:'value']
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:136)
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:39)
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:49)
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)
    at cascading.flow.hadoop.stream.HadoopGroupGate.run(HadoopGroupGate.java:90)
    at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:133)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:522)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:421)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1232)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.OutOfMemoryError: Java heap space
    at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
    at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
    at scala.collection.immutable.List.$colon$colon$colon(List.scala:127)
    at scala.collection.immutable.List.$plus$plus(List.scala:193)
    at com.twitter.algebird.ListMonoid.plus(Monoid.scala:86)
    at com.twitter.algebird.ListMonoid.plus(Monoid.scala:84)
    at com.twitter.scalding.KeyedList$$anonfun$sum$1.apply(TypedPipe.scala:264)
    at com.twitter.scalding.MRMAggregator.aggregate(Operations.scala:279)
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:128)

So my question is what can I do to avoid this situation.

2

There are 2 answers

0
Oscar Boykin On BEST ANSWER

Try .forceToReducers before the .sum. This OOM is happening map side as we are caching values. That may not help in your case.

If the lists are truly too large however, there is really very little that can be done.

0
samthebest On

Quick, but unscalable answer: try increasing mapred.child.java.opts

Better answer, well it's a little tricky to understand the question because I don't know the types of your vals and I don't know what f are vf because you haven't named them informatively. If you provide the minimal amount of code required so I can paste into an IDE and have a play around then I might find your problem.

sum might be where the OOM happens, but it is not what is causing it - refactoring to do sum in different way won't help.

Chances are your crossing on something too big to fit in memory. So mapred.child.java.opts might be only solution for you unless you completely restructure your data. Note cross calls crossWithTiny, now tiny means tiny :)