dynamically parse json in flink map

2.3k views Asked by At

I'm using flink to dynamically analyze json type data,to keyby and sum with the given column,in my mapFunction,I convert json to case class,but the result stream don't get compiler in keyBy function,got error Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key..my code like this

//conf.properties
columns=a:String,b:Int,c:String,d:Long
declusteringColumns=a,c
statsColumns=b
//main function
stream.map(new MapFunc)
      .keyBy(declusteringColumns(0), declusteringColumns.drop(0).toSeq: _*)
      .sum(statsColumns)
class MapFunc extends RichMapFunction[String,Any]{
var clazz:Class[_]=_
override def open(parameters: Configuration): Unit = {
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
val tb = universe.runtimeMirror(universe.getClass.getClassLoader).mkToolBox() 
clazz = tb.compile(tb.parse(
"""|case class Test(a:String,b:Int,c:String,d:Long){}
   |scala.reflect.classTag[Test].runtimeClass"""
.stripMargin)).apply.asInstanceOf[Class[_]] 
}

override def map(value: String) {
val tmp = JSON.parseObject(value)
val values = Utils.loadProperties("columns").split(",").map(y => {
val name = y.substring(0, y.indexOf(":"))
val tpe = y.substring(y.indexOf(":") + 1)
tpe.toLowerCase match {
case "string" => tmp.getString(name)
case "int" => tmp.getInteger(name)
case "long" => tmp.getLong(name)
case _ => null}}).toSeq
clazz.getConstructors()(0).newInstance(values: _*) 
}}

how can I convert json to case class or tuple?

1

There are 1 answers

0
Dmytro Mitin On

Actually, it appeared that the exception

org.apache.flink.api.common.InvalidProgramException: 
This type (GenericType<Test>) cannot be used as key 

remains even for ordinary case class (not generated via reflection)

case class Test(a: String, b: Int, c: String, d: Long)

The first issue is that this case class is not a POJO

https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#pojos

POJOs

Java and Scala classes are treated by Flink as a special POJO data type if they fulfill the following requirements:

  • The class must be public.

  • It must have a public constructor without arguments (default constructor).

  • All fields are either public or must be accessible through getter and setter functions. For a field called foo the getter and setter methods must be named getFoo() and setFoo().

  • The type of a field must be supported by a registered serializer.

So you should replace

case class Test(a: String, b: Int, c: String, d: Long)

with

import scala.beans.BeanProperty

case class Test(
                 @BeanProperty var a: String,
                 @BeanProperty var b: Int,
                 @BeanProperty var c: String,
                 @BeanProperty var d: Long) {
  def this() = {
    this(null, 0, null, 0)
  }
}

The second issue can be that Flink doesn't allow inner-class POJOs that are not static inner classes but reflective toolbox generates a local class nested into a method

https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#rules-for-pojo-types

Rules for POJO types

Flink recognizes a data type as a POJO type (and allows “by-name” field referencing) if the following conditions are fulfilled:

  • The class is public and standalone (no non-static inner class)
  • The class has a public no-argument constructor
  • All non-static, non-transient fields in the class (and all superclasses) are either public (and non-final) or have a public getter- and a setter- method that follows the Java beans naming conventions for getters and setters.

Here is decompiled version of toolbox-generated code

public final class __wrapper$1$a077cb72a4ee423291aac7dfb47454b9$ {

   public Object wrapper() {
      new LazyRef();

      class Test$1 implements Product, Serializable {
         private String a;
         private int b;
         private String c;
         private long d;

         ...
      }

      return scala.reflect.package..MODULE$.classTag(scala.reflect.ClassTag..MODULE$.apply(Test$1.class)).runtimeClass();
   }

   ...
}

The full decompiled code:

https://gist.github.com/DmytroMitin/f1554ad833ea1bb9eb97947ae872d220

So it's possible that if it's really necessary to generate a class for Flink it should be generated manually rather than via toolbox

https://www.reddit.com/r/scala/comments/gfcmul/compile_scala_source_from_string_and/

https://www.reddit.com/r/scala/comments/jckld2/is_there_a_way_to_load_scala_code_at_runtime/

How to eval code that uses InterfaceStability annotation (that fails with "illegal cyclic reference involving class InterfaceStability")?

How do I programmatically compile and instantiate a Java class?

Dynamic compilation of multiple Scala classes at runtime

Tensorflow in Scala reflection

But the code with a class generated manually

https://gist.github.com/DmytroMitin/e33cd244b37f9b33b67f7ac3e6609d39

still throws This type (GenericType<java.lang.Object>) cannot be used as key.

I guess the reason for that is the following (and this is the third issue).

The code with ordinary case class (not generated) seems to work

https://gist.github.com/DmytroMitin/af426d4578dd5e76c9e0d344e6f079ce

But if we replace type Test with Any then it throws This type (GenericType<java.lang.Object>) cannot be used as key

https://gist.github.com/DmytroMitin/a23e45a546790630e838e60c7206adcd

And with reflection we can't return anything but Any.


Now I'm creating TypeInformation[Test] inside my code generated, this seems to fix This type (GenericType<java.lang.Object>) cannot be used as key but now I have

org.apache.flink.api.common.InvalidProgramException: UTF-8 is not serializable. 
The object probably contains or references non serializable fields.

https://gist.github.com/DmytroMitin/16d312dbafeae54518f7ac2c490426b0


I resolved the issue with InvalidProgramException: UTF-8 is not serializable annotating fields of MapFunc with @transient

https://gist.github.com/DmytroMitin/f2f859273075370c4687a30e0c3a2431


Actually it appeared that if we create TypeInformation inside code generated, then toolbox is enough

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.reflect.runtime
import scala.reflect.runtime.universe._
import scala.tools.reflect.ToolBox

object App {
  val toolbox = ToolBox(runtime.currentMirror).mkToolBox()

  class MapFunc extends RichMapFunction[String, Any] {
    var typeInfo: TypeInformation[_] = _
    @transient var classSymbol: ClassSymbol = _

    override def open(parameters: Configuration): Unit = {
      val code =
        """|case class Test(
           |                 @scala.beans.BeanProperty var a: String,
           |                 @scala.beans.BeanProperty var b: Int,
           |                 @scala.beans.BeanProperty var c: String,
           |                 @scala.beans.BeanProperty var d: Long) {
           |  def this() = {
           |    this(null, 0, null, 0)
           |  }
           |}""".stripMargin

      val tree = toolbox.parse(code)
      classSymbol = toolbox.define(tree.asInstanceOf[ImplDef]).asClass
      typeInfo = toolbox.eval(
        q"org.apache.flink.api.common.typeinfo.TypeInformation.of(classOf[${classSymbol.toType}])"
      ).asInstanceOf[TypeInformation[_]]
    }

    override def map(value: String): Any = {
      val values = Seq("aaa", 1, "ccc", 2L) //hardcoded for now
      createClassInstance(classSymbol, values: _*)
    }
  }


  def main(args: Array[String]): Unit = {
    val func = new MapFunc
    func.open(new Configuration)
    val classInstance = func.map("""{a: "aaa", b: 1, c: "ccc", d: 2}""")
    println(classInstance) //Test(aaa,1,ccc,2)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("localhost", 9999)
    val typeInfo = func.typeInfo.asInstanceOf[TypeInformation[Any]]
    println(typeInfo)//PojoType<__wrapper$1$75434c8e32f541f7a87513a2ad2aa0ce.Test, fields = [a: String, b: Integer, c: String, d: Long]>
    val res = stream.map(func)(typeInfo).keyBy("a", "c").sum("b")
    println(res)//org.apache.flink.streaming.api.scala.DataStream@5927f904
  }

  def createClassInstance(classSymbol: ClassSymbol, args: Any*): Any = {
    val runtimeMirror = toolbox.mirror
    val classType = classSymbol.typeSignature
    val constructorSymbol = classType.decl(termNames.CONSTRUCTOR).alternatives.head.asMethod
    val classMirror = runtimeMirror.reflectClass(classSymbol)
    val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
    constructorMirror(args: _*)
  }
}