I try to set up a debezium sink connector for postgreSQL. The "architecture" is: I have a Python script that sends events to the kafka topic (I see them inside the Kafka topic), then I have a debezium Sinc Connector, which should send events to the postgreSQL DB. DB is already created with a dedicated table.
The ERROR I am getting is this:
debezium | 2024-03-11 13:53:44,431 ERROR || WorkerSinkTask{id=location-db-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
debezium | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
debezium | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:533)
debezium | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:513)
debezium | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:349)
debezium | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
debezium | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
debezium | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
debezium | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
debezium | at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
debezium | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
debezium | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
debezium | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
debezium | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
debezium | at java.base/java.lang.Thread.run(Thread.java:829)
debezium | Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
debezium | at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:337)
debezium | at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:91)
debezium | at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:533)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
debezium | ... 14 more
debezium | 2024-03-11 13:53:44,432 INFO || Closing session. [io.debezium.connector.jdbc.JdbcChangeEventSink]
This is my event, which I sent from a Python script to Kafka:
{
"schema": {
"type": "struct",
"fields": [
{ "type": "string", "optional": "false", "field": "id" },
{ "type": "string", "optional": "false", "field": "type" },
{ "type": "string", "optional": "false", "field": "event_name" },
{ "type": "string", "optional": "false", "field": "event_date" },
{ "type": "string", "optional": "false", "field": "location" }
],
"payload": {
"id": "1",
"type": "EventAdvertisement",
"event_name": "EventABC",
"event_date": "20230928",
"location": "VenueXYZ"
}
}
}
Here is the screenshot of Kafka:
Here is my Debezium config file set up:
{
"name": "location-db-connector",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://in_postgres:5432/in_postgres_db",
"connection.username": "in_postgres_user",
"connection.password": "in_postgres_password",
"insert.mode": "upsert",
"delete.enabled": "true",
"primary.key.mode": "record_key",
"schema.evolution": "basic",
"database.time_zone": "UTC",
"topics": "dataqualitymeasurement",
"auto.create":"true"
}
}
In the Enternet I found possible solution, to use additional parameters:
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true"
Here is the link to the answer: Kafka Connect JDBC Sink Connector giving WorkerSinkTask ERROR
If I use it I have the same error. Can anyone please help me sink my data in MySql. Thanks in advance.

