Error calling `JValue.extract` from distributed operations in spark-shell

543 views Asked by At

I am trying to use the case class extraction feature of json4s in Spark, ie calling jvalue.extract[MyCaseClass]. It works fine if I bring the JValue objects into the master and do the extraction there, but the same calls fail in the workers:

import org.json4s._
import org.json4s.jackson.JsonMethods._
import scala.util.{Try, Success, Failure}

val sqx = sqlContext

val data = sc.textFile(inpath).coalesce(2000)

case class PageView(
 client:  Option[String]
)

def extract(json: JValue) = {
  implicit def formats = org.json4s.DefaultFormats
  Try(json.extract[PageView]).toOption
}

val json = data.map(parse(_)).sample(false, 1e-6).cache()

// count initial inputs
val raw = json.count 


// count successful extractions locally -- same value as above
val loc = json.toLocalIterator.flatMap(extract).size

// distributed count -- always zero
val dist = json.flatMap(extract).count // always returns zero

// this throws  "org.json4s.package$MappingException: Parsed JSON values do not match with class constructor"
json.map(x => {implicit def formats = org.json4s.DefaultFormats; x.extract[PageView]}).count

The implicit for Formats is defined locally in the extract function since DefaultFormats is not serializable and defining it at top level caused it to be serialized to for transmission to the workers rather than constructed there. I think the proble still has something to do with the remote initialization of DefaultFormats, but I am not sure what it is.

When I call the extract method directly, insted of my extract function, like in the last example, it no longer complains about serialization but just throws an error that the JSON does not match the expected structure.

How can I get the extraction to work when distributed to the workers?


Edit

@WesleyMiao has reproduced the problem and found that it is specific to spark-shell. He reports that this code works as a standalone application.

2

There are 2 answers

2
Wesley Miao On BEST ANSWER

I got the same exception as yours when running your code in spark-shell. However when I turn your code into a real spark app and submit it to a standalone spark cluster, I got expected results with no exception.

Below is the code I put in a simple spark app.

val data = sc.parallelize(Seq("""{"client":"Michael"}""", """{"client":"Wesley"}"""))

val json = data.map(parse(_))

val dist = json.mapPartitions { jsons =>
  implicit val formats = org.json4s.DefaultFormats
  jsons.map(_.extract[PageView])
}

dist.collect() foreach println

And when I run it using spark-submit, I got the following result.

PageView(Some(Michael))                                                                                                                                       
PageView(Some(Wesley))

And I am also sure that it is running not in "local[*]" mode.

Now I suspect the reason we got exceptions while running in spark-shell has something to do with the case class PageView definition in spark-shell and how spark-shell serialize / distribute it to executor.

0
vvladymyrov On

As suggested here I would move object creation into the map. I.e. I would have function createPageViews that has extract as internal function and will pass createPageViews to workers.

More precisely I would use mapPartitions instead of map - so it would have to call createPageViews (and it's internal function definition part) only once per partition - and not once per every record.