What is Needed:
number of tables in source database are changing rapidly and thus I don't want to edit case classes so I dynamically generate them through SCALA code and put in package. But now not able to read it dynamically. If this works than I would parse "com.example.datasources.fileSystemSource.schema.{}" as object schema members in loop
What has already been Done:
I have some case classes dynamically generated from schema of database tables as below:
object schema{
case class Users(name: String,
favorite_color: String,
favorite_numbers: Array[Int])
case class UserData(registration_dttm: Timestamp,
id: Int,
first_name: String,
last_name: String,
email: String,
gender: String,
ip_address: String,
cc: String,
country: String,
birthdate: String,
salary: Double,
title: String,
comments: String)
}
Then i have used them as dynamic type to read in Load[T] function in my Loader.scala as below:
import org.apache.spark.sql.{Dataset, Encoder, SparkSession}
class Load[T <: Product: Encoder](val tableName: String,
val inputPath: String,
val spark: SparkSession,
val saveMode: String,
val outputPath: String,
val metadata: Boolean)
extends Loader[T] {
val fileSystemSourceInstance: FileSystem[T] =
new FileSystem[T](inputPath, spark, saveMode, tableName)
override def Load: Dataset[T] =
fileSystemSourceInstance.provideData(metadata, outputPath).as[T]
}
Now, by using reflect.api I am able to get TypeTag for my case classes.
def stringToTypeTag[A](name: String): TypeTag[A] = {
val c = Class.forName(name)
val mirror = runtimeMirror(c.getClassLoader)
val sym = mirror.staticClass(name)
val tpe = sym.selfType
TypeTag(mirror, new api.TypeCreator {
def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) =
if (m eq mirror) tpe.asInstanceOf[U # Type]
else throw new IllegalArgumentException(s"Type tag defined in $mirror cannot be migrated to other mirrors.")
})
}
So if i print now my case class type tag I got:
val typetagDynamic = stringToTypeTag("com.example.datasources.fileSystemSource.schema.Users")
println(typetags)
TypeTag[com.example.datasources.fileSystemSource.schema.Users]
Problem:
Need to read these TypeTag or Dynamically generated case classes, to encode my datasets as below:
new Load[typetagDynamic](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata)(Encoders.product[typetagDynamic]).Load
This is giving me error : Cannot resolve symbol typetagDynamic
if used like this:
new Load[typetagDynamic.type](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata)(Encoders.product[typetagDynamic.type]).Load
This is giving me error : type arguments [T] do not conform to method product's type parameter bounds [T <: Product]
If you know a type
schema.Users
only at runtime try to replacewith
You need either:
to define an instance of type class
org.apache.spark.sql.Encoder
forUsers
in its companion object (so that the instance will be in implicit scope)or
to import instances of
Encoder
for case classes viaimport spark.implicits._
but you need to import them not into current local scope but into toolbox-generated local scope, so in this case you should replacewith
See the whole code: https://gist.github.com/DmytroMitin/2cad52c27f5360ae9b1e7503d6f6cd00
https://groups.google.com/g/scala-internals/c/ta-vbUT6JE8