I am using Kafka Connect with the Google PubSub Connector to write messages from gcp PubSub into Kafka Topics.
My Connector has the following configuration:
{
"name": "MyTopicSourceConnector",
"config": {
"connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
"tasks.max": "10",
"kafka.topic": "myTopic",
"cps.project": "my-project-id",
"cps.subscription": "myTopic-sub",
"name": "MyTopicSourceConnector",
"key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"key.converter.schema.registry.url": "http://myurl-schema-registry:8081",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url": "http://myurl-schema-registry:8081"
}
}
The proto message value schema looks like this:
syntax = "proto3";
message value_myTopic {
bytes message = 1;
string notificationConfig = 2;
string eventTime = 3;
string bucketId = 4;
string payloadFormat = 5;
string eventType = 6;
string objectId = 7;
string objectGeneration = 8;
}
This setup works when I am using avro or json (with the appropriate converters) but with Protobuf my connector is throwing the following error message right after deploying it and fails:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:292)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Unsupported root schema of type STRING
at io.confluent.connect.protobuf.ProtobufData.rawSchemaFromConnectSchema(ProtobufData.java:315)
at io.confluent.connect.protobuf.ProtobufData.fromConnectSchema(ProtobufData.java:304)
at io.confluent.connect.protobuf.ProtobufData.fromConnectData(ProtobufData.java:109)
at io.confluent.connect.protobuf.ProtobufConverter.fromConnectData(ProtobufConverter.java:83)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:292)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more
I don't know why you have this issue. In case you are interested, instead of the Confluent Protobuf converter, there is a community one, from Blue Apron, and it juste necessitate that you add another config to specify which Protobuf (de)serialization class to use.
Example from Snowflake of the 2 covnerters can be seen here.