Flink SQL Kafka Source Skip Failed Messages

63 views Asked by At

Using Kafka Connector as a source table

      'connector' = 'kafka',
      'topic' = 'test',   
      'format' = 'json'

Whenever invalid message is received from Kafka topic (e.g. not JSON) - deserialization error is happening and Flink job is failing and after recovery is stuck on trying to consume the same invalid message again and again. If it was Flink job defined with Java SDK - I could ignore the exception and skip the failed messages (move it to DLQ, etc..).

However in Flink SQL I don't see any option to do so.. If not - then I'm not sure Flink SQL is suitable for production pipelines. It must be possible to tackle cases like that.

Thanks in advance

1

There are 1 answers

1
Niko On

I think you probably looking for:

'json.ignore-parse-errors' = 'true'

or

'value.json.fail-on-missing-field' = 'false',

look here