Issue with Kafka in Clojure Repl for Arm64 M1

58 views Asked by At

I am having issue with Kafka in my repl:

Execution error (NullPointerException) at kafka_component.consumer.KafkaConsumer/stop (consumer.clj:21). 
Cannot invoke "Object.getClass()" because "target" is null

My yml file including kafka and zookeeper is this:

  zookeeper:
    image: confluentinc/cp-zookeeper
    restart: unless-stopped
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
    container_name: dev_zookeeper_1

  kafka:
    image: confluentinc/cp-kafka
    restart: unless-stopped
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_REPLICA_FETCH_MAX_BYTES: 30971520
      KAFKA_MESSAGE_MAX_BYTES: 30971520
      KAFKA_LOG_RETENTION_HOURS: 1440
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - kafka-data:/kafka
    container_name: dev_kafka_1

And my consumer is

(ns kafka-component.consumer
  (:require
    [com.stuartsierra.component :as component]
    [clojure.tools.reader.edn   :as edn]
    [clj-kafka.zk               :as zk]
    [clj-kafka.consumer.zk      :as c]))

(defrecord KafkaConsumer [config conn]
  component/Lifecycle
  (start [this]
    (assoc this :conn (c/consumer (:config this))))
  (stop [this]
    (.shutdown (:conn this))
    (dissoc this :conn)))

(defn new-kafka-consumer
  ([]
   (new-kafka-consumer "192.168.59.103:2181" "clj-kafka.consumer"))
  ([zk-addr group-id]
   (new-kafka-consumer zk-addr group-id {}))
  ([zk-addr group-id kafka-config]
   (map->KafkaConsumer
     {:config (merge kafka-config
                     {"zookeeper.connect" zk-addr
                      "group.id" group-id})})))

(defn- rewind!
  "reset offsets of all partitions of a topic back to 0"
  [config topic]
  (let [parts (keys (zk/partitions config topic))
        group (get config "group.id")]
    (doseq [p parts]
      (zk/set-offset! config group topic p 0))))

(defn from-beginning
  "return a lazy sequence of meessages from the beginning of a topic"
  [{:keys [config conn]} topic]
  (rewind! config topic)
  (c/messages conn topic))

(defn- from-bytes
  [^bytes b]
  (String. b "UTF-8"))

(defn load-value
  "Load a clojure data structure from a consumed message"
  [{:keys [value]}]
  (try
    (edn/read-string (from-bytes value))
    (catch Exception e
      (println (str "load-value exception: " (.getMessage e))))))

(defn load-text
  [{:keys [value]}]
  (try
    (from-bytes value)
    (catch Exception e
      (println (str "load-text exception: " (.getMessage e))))))

I have a M1 Mac, so it is Arm64 architecture. I have tried different kafka images like bitnami or wurstmeister, but the error still seems to persist. Any help on this issue would be much appreciated

0

There are 0 answers