Flink: Key serializer used in Java DataSet API incompatible with that used in Scala DataStream API

166 views Asked by At

Our Flink code (currently, using Flink 1.12) is written in Scala and generally contains a bunch of keyed time windows and process functions as operators. We have externalized savepoint storage to allow for state restoration upon failure. Recently, we decided to use Flink's state processor API to allow for state manipulation and migration when, e.g., the logic in the keyed operators changes.

In Flink 1.12, the state processor API is implemented using Flink's DataSet API, and is only available in Java. While this does not pose any issues when the keys are of primitive type, we noticed that there is an incompatibility between the key serializer used for the DataSet API (defaults to the KryoSerializer) and that used in our Scala code (uses ScalaCaseClassSerializer).

Here's a small example to explain what I mean:

A Flink bot (written in Scala using Flink's Scala wrappers):

case class Key(prefix: String, suffix: String)
case class KeyValue[A](key: Key, value: A)

def main(args: Array[String]): Unit = {

  val checkpointDir = "/path/to/checkpoint/dir" // (or read from `args`)
  val flinkConfig: Configuration = {
      val conf = new Configuration()
      conf.setString(CheckpointingOptions.STATE_BACKEND, "filesystem")
      conf.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, s"file://$checkpointDir")
      conf.setString("execution.checkpointing.interval", "5s")
      conf.setString(
        "execution.checkpointing.externalized-checkpoint-retention",
        "RETAIN_ON_CANCELLATION"
      )
      conf
    }

  val env = StreamExecutionEnvironment.createLocalEnvironment(1, flinkConfig)

  env
    .addSource(new KafkaSource[KeyValue[Int]]) // returns DataStream[KeyValue[Int]]
    .uid("event-source")
    .keyBy(_.key)
    .process(new CustomAggregationFunction) // Implementation not relevant
    .uid("aggregate")
    .printToErr
    .uid("print-to-console")

  env.execute()
}

A Flink state reader (also written in Scala but Flink's state processor API uses the java version of DataSet API).

def main(args: Array[String]): Unit = {
  val checkpointDir = "/path/to/above/checkpoint/dir" // (or read from `args`)
  val env = ExecutionEnvironment.getExecutionEnvironment
  val backend: StateBackend = new MemoryStateBackend()

  val savepoint = Savepoint.load(env, checkpointDir, backend)

  savepoint
    .readKeyedState("aggregate", new StateReaderFunction) // Reads state of `CustomAggregationFunction`
    .printToErr()

  env.execute()
}

State creation using the Flink bot works just fine (and doesn't revert to Kryo). But when I try to read this state using Flink's state processor API, I get the following error:

org.apache.flink.util.StateMigrationException: The new key serializer (org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@4541edf4) must be compatible with the previous key serializer (org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer@d1821229).

As expected, this error does not occur when I use Flink's DataStream API in Java. Also, this error does not occur in a later version of Flink (>1.15), presumably because Flink's Scala API is deprecated there. But I would like to avoid migrating to a higher version (just yet) unless there's no other solution (this would introduce a huge lot of refactoring on our end).

I have also tried using env.getConfig.enableForceKryo() to force a Kryo-based serialisation of Scala objects but that didn't help.

Is there any way to fix this incompatibility by sticking to Flink 1.12? Thanks in advance!

1

There are 1 answers

1
Martijn Visser On

The reason why it most likely doesn't occur in Flink 1.15 and newer versions, is because the State Processor API doesn't use the DataSet API under the hood in that version, but the DataStream API in batch execution mode. See https://issues.apache.org/jira/browse/FLINK-24912

One thing you could try is to cherry-pick that commit on a local Flink 1.12 fork, and use that to migrate state.