I am trying to use the case class extraction feature of json4s in Spark,
ie calling jvalue.extract[MyCaseClass]
. It works fine if I bring the JValue
objects into the master and do the extraction there, but the same calls fail in the workers:
import org.json4s._
import org.json4s.jackson.JsonMethods._
import scala.util.{Try, Success, Failure}
val sqx = sqlContext
val data = sc.textFile(inpath).coalesce(2000)
case class PageView(
client: Option[String]
)
def extract(json: JValue) = {
implicit def formats = org.json4s.DefaultFormats
Try(json.extract[PageView]).toOption
}
val json = data.map(parse(_)).sample(false, 1e-6).cache()
// count initial inputs
val raw = json.count
// count successful extractions locally -- same value as above
val loc = json.toLocalIterator.flatMap(extract).size
// distributed count -- always zero
val dist = json.flatMap(extract).count // always returns zero
// this throws "org.json4s.package$MappingException: Parsed JSON values do not match with class constructor"
json.map(x => {implicit def formats = org.json4s.DefaultFormats; x.extract[PageView]}).count
The implicit for Formats
is defined locally in the extract
function since DefaultFormats is not serializable and defining it at top level caused it to be serialized to for transmission to the workers rather than constructed there. I think the proble still has something to do with the remote initialization of DefaultFormats
, but I am not sure what it is.
When I call the extract
method directly, insted of my extract
function, like in the last example, it no longer complains about serialization but just throws an error that the JSON does not match the expected structure.
How can I get the extraction to work when distributed to the workers?
Edit
@WesleyMiao has reproduced the problem and found that it is specific to spark-shell. He reports that this code works as a standalone application.
I got the same exception as yours when running your code in spark-shell. However when I turn your code into a real spark app and submit it to a standalone spark cluster, I got expected results with no exception.
Below is the code I put in a simple spark app.
And when I run it using spark-submit, I got the following result.
And I am also sure that it is running not in "local[*]" mode.
Now I suspect the reason we got exceptions while running in spark-shell has something to do with the case class PageView definition in spark-shell and how spark-shell serialize / distribute it to executor.