I am trying to migrate a schema registry from a local cluster to Confluent Cloud using Replicator as it is specified in the tutorial. It manages to copy all subjects except a few and I cannot figure out why... It spits out the following error:
ERROR Failed to translate schema registry record (io.confluent.connect.replicator.schemas.SchemaTranslator:188)
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Error while registering schema; error code: 500; error code: 500
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:495)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:486)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:481)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:212)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:267)
at io.confluent.connect.replicator.schemas.SchemaTranslator.translateSchema(SchemaTranslator.java:211)
at io.confluent.connect.replicator.schemas.SchemaTranslator.translateSchemas(SchemaTranslator.java:166)
at io.confluent.connect.replicator.schemas.SchemaTranslator.translateCollectedRecords(SchemaTranslator.java:135)
at io.confluent.connect.replicator.ReplicatorSourceTask.translateCollectedRecords(ReplicatorSourceTask.java:566)
at io.confluent.connect.replicator.ReplicatorSourceTask.poll(ReplicatorSourceTask.java:558)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
The schemas that it does not manage to copy -- does not look any different from those that it manages to copy.
How can I debug it?
I am using Confluent platform 6.0.0.
connect-standalone.properties
:
#value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Store offsets on local filesystem
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=5000
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="****" password="****";
security.protocol=SASL_SSL
consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="****" password="****";
consumer.security.protocol=SASL_SSL
consumer.offset.start=earliest
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.request.timeout.ms=20000
# x4
producer.retry.backoff.ms=500
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="****" password="****";
producer.security.protocol=SASL_SSL
config.storage.replication.factor=1
offset.storage.replication.factor=1
istatus.storage.replication.factor=1
quickstart-replicator.properties
:
# basic connector configuration
name=replicator-v28
connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector
key.converter=io.confluent.connect.replicator.util.ByteArrayConverter
value.converter=io.confluent.connect.replicator.util.ByteArrayConverter
header.converter=io.confluent.connect.replicator.util.ByteArrayConverter
tasks.max=4
# source cluster connection info
src.kafka.bootstrap.servers=localhost:9092
# destination cluster connection info
dest.kafka.ssl.endpoint.identification.algorithm=https
dest.kafka.sasl.mechanism=PLAIN
dest.kafka.request.timeout.ms=20000
dest.kafka.bootstrap.servers=<kafka-cloud-url>:9092
retry.backoff.ms=500
dest.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="****" password="****";
dest.kafka.security.protocol=SASL_SSL
# Schema Registry migration topics to replicate from source to destination
# topic.whitelist indicates which topics are of interest to replicator
topic.whitelist=_schemas
# schema.registry.topic indicates which of the topics in the ``whitelist`` contains schemas
schema.registry.topic=_schemas
# Connection settings for destination Confluent Cloud Schema Registry
schema.registry.url=https://<cloud-schema-registry-url>
schema.registry.client.basic.auth.credentials.source=USER_INFO
schema.registry.client.basic.auth.user.info=****:****
confluent.topic.replication.factor=3
As the log states, the problem is that the client could not publish a subject in the schema registry.
The way to debug it is to set the
debug
logging level for Replicator. For instance, you can change in~/confluent-6.0.0/etc/kafka/connect-log4j.properties
the line:You need to restart the replicator and write the logs in a file, like this:
In the
debug.txt
, you can find which subject could not be POSTed in the schema registry and try to do it manually.Most likely, it happens because the destination schema registry already contains a subject with the same id. Nore, that if you manually delete a subject from schema, it still persists in schema registry, just not being visible. To get rid completely of a subject from schema registry, you need to do a "hard delete" of the subject.