Is there a way to using kafka schema registry without magic byte?

1.4k views Asked by At

I'm trying to make my applications work using the schema registry from confluent but at this point I'm not in total control of the producers, you can even see them as legacy applications that simply are not bound to the confluent products.

I was looking at the confluent information and it seems all the messages should include in the payload a Magic Byte and Schema ID
https://docs.confluent.io/3.2.0/schema-registry/docs/serializer-formatter.html

or else when I try to consume it I get an error:

[2020-09-25 13:12:09,008] ERROR WorkerSinkTask{id=s3_parquet_connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:491)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic com.obj_pos to Protobuf: 
            at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:123)
            at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:491)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
            ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2020-09-25 13:12:09,010] ERROR WorkerSinkTask{id=s3_parquet_connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

my question is, if there is a way of somehow either disable this magic byte check or if I could create a kafka stream that would just append a this 5 bytes to the initial message so that afterwards I could consume it with a consumer that would connect to the schema registry.

What is happening is that the producer is out of my control so I would need somehow to be able to deserialize messages that do not contain those 5 bytes because they are produced by producers that don't rely on the confluent serializers/de-serializers

1

There are 1 answers

2
OneCricketeer On BEST ANSWER

they are produced by producers that don't rely on the confluent serializers

Then the problem isn't the Registry.

You shouldn't be using the Converters written by Confluent to consume the messages, as those are bound to the Registry, and there is no way to skip it.

You would instead use the BlueApron ones (assuming the data is really protobuf), or write your own Converter classes.