Kafka Connect - Redis Sink Connector doing String upsert with last consumed message offset

35 views Asked by At

I am using the Redis Kafka sink connector to consume messages from a Kafka topic and update a Redis database. Whenever a new message is consumed, the Redis Kafka sink connector upserts the message's value into Redis as JSON, and it also upserts a String in the Redis database with the key com.redis.kafka.connect.sink.$Key and a value along the lines of {"topic":"$Key","partition":0,"offset":9}. A picture of this in RedisInsight is here. The JSON upsert into Redis is exactly what I expected, but I did not expect this additional String upsert to happen.

Why is this String upsert of the last consumed offset occuring and how can I prevent it? Or is this upsert into Redis necessary so the Redis Kafka sink connector remembers the last offset it has consumed?

I have reviewed the Redis Kafka Connector documentation, searched through various SO and Redis Forum posts, and tried the Docker example on the connector's GitHub, but I did not find an explanation about why the Redis Kafka connector upserted the last consumed offset into Redis.

I am running Kakfa, Zookeeper, and Kafka Connect in Docker. A Debezium Kafka Connect connector is producing events to Kafka by reading a local PostgresQL database's write-ahead log. The docker compose file is:

version: "2"
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:2.4
    container_name: zookeeper
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888
  kafka:
    image: quay.io/debezium/kafka:2.4
    container_name: kafka
    ports:
      - 29092:29092
    links:
      - zookeeper
    environment:
      ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://kafka:29092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
  connect:
    image: quay.io/debezium/connect:2.4
    container_name: connect
    ports:
      - 8083:8083
    links:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
    volumes:
      - ./plugins/redis-redis-kafka-connect-0.9.0:/kafka/connect/redis-connector

The configuration for the Debezium Postgres source connector is:

{
  "name": "postgres-source-connector",
  "config": {
    "plugin.name": "pgoutput",
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "6.tcp.ngrok.io",
    "database.port": "18384",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "test",
    "topic.prefix": "dbserver1"
  }
}

The configuration for the Redis sink connector is:

{
  "name": "redis-sink-connector",
  "config": {
    "connector.class": "com.redis.kafka.connect.RedisSinkConnector",
    "tasks.max": "1",
    "redis.command": "JSONSET",
    "redis.uri": "redis_uri",
    "redis.username": "redis_username",
    "redis.password": "redis_password",
    "topics": "dbserver1.public.demo",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true"
  }
}

Thank you in advance for any help!

0

There are 0 answers