Impossible to operate on custom type after it is encoded? Spark Dataset

1.2k views Asked by At

Say you have this (solution of encoding custom type is brought from this thread):

// assume we handle custom type
class MyObj(val i: Int, val j: String)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
val ds = spark.createDataset(Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c")))

When do a ds.show, I got:

+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

I understand that it's because the contents are encoded into internal Spark SQL binary representation. But how can I display the decoded content like this?

+---+---+
| _1| _2|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+

UPDATE1

Displaying content is not the biggest issue, what's more important is that it could lead to problem when processing the dataset, consider this example:

// continue with the above code
val ds2 = spark.createDataset(Seq(new MyObj(2, "a"),new MyObj(6, "b"),new MyObj(5, "c"))) 

ds.joinWith(ds2, ds("i") === ds2("i"), "inner") 
// this gives a Runtime error: org.apache.spark.sql.AnalysisException: Cannot resolve column name "i" among (value); 

Does this mean, kryo-encoded type is not able to do operation like joinWith conveniently?

How do we process custom type on Dataset then?
If we are not able to process it after it's encoded, what's the point of this kryo encoding solution on custom type?!

(Solution provided by @jacek below is good to know for case class type, but it still cannot decode custom type)

1

There are 1 answers

9
Jacek Laskowski On BEST ANSWER

The following worked for me, but seems like using high-level API to do low-level (deserialization) work.

This is not to say it should be done this way, but shows that it's possible.

I don't know why KryoDeserializer does not deserialize bytes to the object the bytes came from. It is just this way.

One major difference between your class definition and mine is this case that let me using the following trick. Again, no idea exactly why it makes it possible.

scala> println(spark.version)
3.0.1

// Note that case keyword
case class MyObj(val i: Int, val j: String)
import org.apache.spark.sql.Encoders
implicit val myObjEncoder = Encoders.kryo[MyObj]
// myObjEncoder: org.apache.spark.sql.Encoder[MyObj] = class[value[0]: binary]

val ds = (Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c"))).toDS
// the Kryo deserializer gives bytes
scala> ds.printSchema
root
 |-- value: binary (nullable = true)

scala> :type sc
org.apache.spark.SparkContext

// Let's deserialize the bytes into an object
import org.apache.spark.serializer.KryoSerializer
val ks = new KryoSerializer(sc.getConf)
// that begs for a generic UDF
val deserMyObj = udf { value: Array[Byte] => 
  import java.nio.ByteBuffer
  ks.newInstance.deserialize(ByteBuffer.wrap(value)).asInstanceOf[MyObj] }

val solution = ds.select(deserMyObj('value) as "result").select($"result.*")
scala> solution.show
+---+---+
|  i|  j|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+