I am having issue with Kafka in my repl:
Execution error (NullPointerException) at kafka_component.consumer.KafkaConsumer/stop (consumer.clj:21).
Cannot invoke "Object.getClass()" because "target" is null
My yml file including kafka and zookeeper is this:
zookeeper:
image: confluentinc/cp-zookeeper
restart: unless-stopped
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
container_name: dev_zookeeper_1
kafka:
image: confluentinc/cp-kafka
restart: unless-stopped
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_REPLICA_FETCH_MAX_BYTES: 30971520
KAFKA_MESSAGE_MAX_BYTES: 30971520
KAFKA_LOG_RETENTION_HOURS: 1440
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- kafka-data:/kafka
container_name: dev_kafka_1
And my consumer is
(ns kafka-component.consumer
(:require
[com.stuartsierra.component :as component]
[clojure.tools.reader.edn :as edn]
[clj-kafka.zk :as zk]
[clj-kafka.consumer.zk :as c]))
(defrecord KafkaConsumer [config conn]
component/Lifecycle
(start [this]
(assoc this :conn (c/consumer (:config this))))
(stop [this]
(.shutdown (:conn this))
(dissoc this :conn)))
(defn new-kafka-consumer
([]
(new-kafka-consumer "192.168.59.103:2181" "clj-kafka.consumer"))
([zk-addr group-id]
(new-kafka-consumer zk-addr group-id {}))
([zk-addr group-id kafka-config]
(map->KafkaConsumer
{:config (merge kafka-config
{"zookeeper.connect" zk-addr
"group.id" group-id})})))
(defn- rewind!
"reset offsets of all partitions of a topic back to 0"
[config topic]
(let [parts (keys (zk/partitions config topic))
group (get config "group.id")]
(doseq [p parts]
(zk/set-offset! config group topic p 0))))
(defn from-beginning
"return a lazy sequence of meessages from the beginning of a topic"
[{:keys [config conn]} topic]
(rewind! config topic)
(c/messages conn topic))
(defn- from-bytes
[^bytes b]
(String. b "UTF-8"))
(defn load-value
"Load a clojure data structure from a consumed message"
[{:keys [value]}]
(try
(edn/read-string (from-bytes value))
(catch Exception e
(println (str "load-value exception: " (.getMessage e))))))
(defn load-text
[{:keys [value]}]
(try
(from-bytes value)
(catch Exception e
(println (str "load-text exception: " (.getMessage e))))))
I have a M1 Mac, so it is Arm64 architecture. I have tried different kafka images like bitnami or wurstmeister, but the error still seems to persist. Any help on this issue would be much appreciated