I am using the Redis Kafka sink connector to consume messages from a Kafka topic and update a Redis database. Whenever a new message is consumed, the Redis Kafka sink connector upserts the message's value into Redis as JSON, and it also upserts a String in the Redis database with the key com.redis.kafka.connect.sink.$Key and a value along the lines of {"topic":"$Key","partition":0,"offset":9}. A picture of this in RedisInsight is here. The JSON upsert into Redis is exactly what I expected, but I did not expect this additional String upsert to happen.
Why is this String upsert of the last consumed offset occuring and how can I prevent it? Or is this upsert into Redis necessary so the Redis Kafka sink connector remembers the last offset it has consumed?
I have reviewed the Redis Kafka Connector documentation, searched through various SO and Redis Forum posts, and tried the Docker example on the connector's GitHub, but I did not find an explanation about why the Redis Kafka connector upserted the last consumed offset into Redis.
I am running Kakfa, Zookeeper, and Kafka Connect in Docker. A Debezium Kafka Connect connector is producing events to Kafka by reading a local PostgresQL database's write-ahead log. The docker compose file is:
version: "2"
services:
zookeeper:
image: quay.io/debezium/zookeeper:2.4
container_name: zookeeper
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:2.4
container_name: kafka
ports:
- 29092:29092
links:
- zookeeper
environment:
ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://kafka:29092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
connect:
image: quay.io/debezium/connect:2.4
container_name: connect
ports:
- 8083:8083
links:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
volumes:
- ./plugins/redis-redis-kafka-connect-0.9.0:/kafka/connect/redis-connector
The configuration for the Debezium Postgres source connector is:
{
"name": "postgres-source-connector",
"config": {
"plugin.name": "pgoutput",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "6.tcp.ngrok.io",
"database.port": "18384",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "test",
"topic.prefix": "dbserver1"
}
}
The configuration for the Redis sink connector is:
{
"name": "redis-sink-connector",
"config": {
"connector.class": "com.redis.kafka.connect.RedisSinkConnector",
"tasks.max": "1",
"redis.command": "JSONSET",
"redis.uri": "redis_uri",
"redis.username": "redis_username",
"redis.password": "redis_password",
"topics": "dbserver1.public.demo",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true"
}
}
Thank you in advance for any help!