I have following code where I maintain a large List: What I do here is go over the data stream and create an inverted index. I use twitter scalding api and dataTypePipe is type of TypedPipe
lazy val cats = dataTypePipe.cross(cmsCats)
.map(vf => (vf._1.itemId, vf._1.leafCats, vf._2))
.flatMap {
case (id, categorySet, cHhitters) => categorySet.map(cat => (
...
}
.filter(f => f._2.nonEmpty)
.group.withReducers(4000)
.sum
.map {
case ((token,bucket), ids) =>
toIndexedRecord(ids, token, bucket)
}
Due to a serialization issue I convert scala list to java list and use avro to write:
def toIndexedRecord(ids: List[Long], token: String, bucket: Int): IndexRecord = {
val javaList = ids.map(l => l: java.lang.Long).asJava //need to convert from scala long to java long
new IndexRecord(token, bucket,javaList)
}
But the issue is large number of information keeping in list cause Java Heap issue. I believe summing is also a contributor to this issue
2013-08-25 16:41:09,709 WARN org.apache.hadoop.mapred.Child: Error running child
cascading.pipe.OperatorException: [_pipe_0*_pipe_1][com.twitter.scalding.GroupBuilder$$anonfun$1.apply(GroupBuilder.scala:189)] operator Every failed executing operation: MRMAggregator[decl:'value']
at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:136)
at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:39)
at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:49)
at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)
at cascading.flow.hadoop.stream.HadoopGroupGate.run(HadoopGroupGate.java:90)
at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:133)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:522)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:421)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1232)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.OutOfMemoryError: Java heap space
at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
at scala.collection.immutable.List.$colon$colon$colon(List.scala:127)
at scala.collection.immutable.List.$plus$plus(List.scala:193)
at com.twitter.algebird.ListMonoid.plus(Monoid.scala:86)
at com.twitter.algebird.ListMonoid.plus(Monoid.scala:84)
at com.twitter.scalding.KeyedList$$anonfun$sum$1.apply(TypedPipe.scala:264)
at com.twitter.scalding.MRMAggregator.aggregate(Operations.scala:279)
at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:128)
So my question is what can I do to avoid this situation.
Try .forceToReducers before the .sum. This OOM is happening map side as we are caching values. That may not help in your case.
If the lists are truly too large however, there is really very little that can be done.