I'm using flink to dynamically analyze json type data,to keyby and sum with the given column,in my mapFunction,I convert json to case class,but the result stream don't get compiler in keyBy function,got error Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key.
.my code like this
//conf.properties
columns=a:String,b:Int,c:String,d:Long
declusteringColumns=a,c
statsColumns=b
//main function
stream.map(new MapFunc)
.keyBy(declusteringColumns(0), declusteringColumns.drop(0).toSeq: _*)
.sum(statsColumns)
class MapFunc extends RichMapFunction[String,Any]{
var clazz:Class[_]=_
override def open(parameters: Configuration): Unit = {
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
val tb = universe.runtimeMirror(universe.getClass.getClassLoader).mkToolBox()
clazz = tb.compile(tb.parse(
"""|case class Test(a:String,b:Int,c:String,d:Long){}
|scala.reflect.classTag[Test].runtimeClass"""
.stripMargin)).apply.asInstanceOf[Class[_]]
}
override def map(value: String) {
val tmp = JSON.parseObject(value)
val values = Utils.loadProperties("columns").split(",").map(y => {
val name = y.substring(0, y.indexOf(":"))
val tpe = y.substring(y.indexOf(":") + 1)
tpe.toLowerCase match {
case "string" => tmp.getString(name)
case "int" => tmp.getInteger(name)
case "long" => tmp.getLong(name)
case _ => null}}).toSeq
clazz.getConstructors()(0).newInstance(values: _*)
}}
how can I convert json to case class or tuple?
Actually, it appeared that the exception
remains even for ordinary case class (not generated via reflection)
The first issue is that this case class is not a POJO
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#pojos
So you should replace
with
The second issue can be that Flink doesn't allow inner-class POJOs that are not static inner classes but reflective toolbox generates a local class nested into a method
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#rules-for-pojo-types
Here is decompiled version of toolbox-generated code
The full decompiled code:
https://gist.github.com/DmytroMitin/f1554ad833ea1bb9eb97947ae872d220
So it's possible that if it's really necessary to generate a class for Flink it should be generated manually rather than via toolbox
https://www.reddit.com/r/scala/comments/gfcmul/compile_scala_source_from_string_and/
https://www.reddit.com/r/scala/comments/jckld2/is_there_a_way_to_load_scala_code_at_runtime/
How to eval code that uses InterfaceStability annotation (that fails with "illegal cyclic reference involving class InterfaceStability")?
How do I programmatically compile and instantiate a Java class?
Dynamic compilation of multiple Scala classes at runtime
Tensorflow in Scala reflection
But the code with a class generated manually
https://gist.github.com/DmytroMitin/e33cd244b37f9b33b67f7ac3e6609d39
still throws
This type (GenericType<java.lang.Object>) cannot be used as key
.I guess the reason for that is the following (and this is the third issue).
The code with ordinary case class (not generated) seems to work
https://gist.github.com/DmytroMitin/af426d4578dd5e76c9e0d344e6f079ce
But if we replace type
Test
withAny
then it throwsThis type (GenericType<java.lang.Object>) cannot be used as key
https://gist.github.com/DmytroMitin/a23e45a546790630e838e60c7206adcd
And with reflection we can't return anything but
Any.
Now I'm creating
TypeInformation[Test]
inside my code generated, this seems to fixThis type (GenericType<java.lang.Object>) cannot be used as key
but now I havehttps://gist.github.com/DmytroMitin/16d312dbafeae54518f7ac2c490426b0
I resolved the issue with
InvalidProgramException: UTF-8 is not serializable
annotating fields ofMapFunc
with@transient
https://gist.github.com/DmytroMitin/f2f859273075370c4687a30e0c3a2431
Actually it appeared that if we create
TypeInformation
inside code generated, then toolbox is enough