Add ADT column in Spark dataset?

558 views Asked by At

I want to create a dataset which contains an ADT column. Based on this question: Encode an ADT / sealed trait hierarchy into Spark DataSet column I know, that there's a solution which encodes with kryo, but that is not really helpful. Theres's another way to solve the issue which is far better. Let's define the following ADT:

sealed case class Animal(sound: String)
object Cat extends Animal("miau")
object Dog extends Animal("wuff") 

and define a case class which uses Animal

case class Pet(name: String, sound: Animal)

I can easily create now a dataset from Pet

val ds = List(Pet("Tom", Cat), Pet("Beethoven", Dog)).toDS
ds.show()
+---------+------+
|     name| sound|
+---------+------+
|      Tom|[miau]|
|Beethoven|[wuff]|
+---------+------+

Note, that sound is a Struct, but to extract the element is straightforward:

ds.select("name", "sound.*").show()
+---------+-----+
|name     |sound|
+---------+-----+
|Tom      |miau |
|Beethoven|wuff |
+---------+-----+

Actually this is the final structure that I want to achieve. There are two problems that I am facing.

  1. Usually it's not a good idea to inherit from a case class
  2. Exhaustive pattern matching ask for default case

Example for problem 2:

 def getSound(animal: Animal): String = animal match {
   case Cat => Cat.sound
   case Dog => Dog.sound
   case _ => ""
 }

To overcome problem 2, I though of creating a sealed abstract class. I also want to maki it a product

sealed abstract class Animal(sound: String) extends Product
case object Cat extends Animal("miau")
case object Dog extends Animal("wuff")

Now problem 2 is handled, and no more default case required. However I am not able to create a dataset from Animal. I get the following exception: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: Couldn't find sound on class Animal

What I really want to obtain, is to get the same behaviour as the Option. We can create an a case class which contains an optional field:

case class Person(name: String, age: Option[Int])
List(Person("Jack", Some(26)), Person("Julia", None)).toDS.show()
+-----+----+
| name| age|
+-----+----+
| Jack|  26|
|Julia|null|
+-----+----+

I checked the implementation of Option and it's a sealed abstract class too, so what I am missing? How is the Option encoded for datasets?

UPDATE

Sorry the last part with Option doesn't make too much sense here, since there you need to write explicitly the value which you want to see in the end in the dataset.

But the question remains, how could I encode a column created from an ADT with proper pattern matching.

2

There are 2 answers

2
sanyi14ka On

What I was missing was an apply method for my Animal class.

sealed abstract class Animal(val sound: String) extends Product with Serializable
  case object Cat extends Animal(sound = "miau")
  case object Dog extends Animal(sound = "wuff")
  object Animal {
    def apply(animal: Animal): String = animal match {
      case Cat => Cat.sound
      case Dog => Dog.sound
    }
  }

Using this I can obtained the almost-desired result:

val ds = List(Pet("Tom", Cat), Pet("Beethoven", Dog)).toDS
ds.show()
+---------+------+
|     name| sound|
+---------+------+
|      Tom|[miau]|
|Beethoven|[wuff]|
+---------+------+
0
Atais On

The sanyi14ka will actually never work.

The topic of ADT/Enum Encoders in DataSet is not new, but even today it does not work properly.

You might find useful those two link: