How to connect to Kafka cluster in docker compose from python program running on localhost

17 views Asked by At

I am newbie in using Docker compose and Python. For development purposes I have to configure kafka cluster of 3 brokers using docker-compose. How can i read messages from brokers in docker compose using python application?

docker-compose.yml was created using this

version: '3.5'
services:
  kafka-gen:
    image: confluentinc/cp-kafka:7.3.3
    hostname: kafka-gen
    container_name: kafka-gen
    volumes:
      - ./scripts/create_cluster_id.sh:/tmp/create_cluster_id.sh
      - ./clusterID:/tmp/clusterID
    command: "bash -c '/tmp/create_cluster_id.sh'"

  kafka1:
    image: confluentinc/cp-kafka:7.3.3
    hostname: kafka1
    container_name: kafka1
    ports:
      - "39092:39092"
      - "49092:49092"
    environment:
      KAFKA_LISTENERS: BROKER://kafka1:19092,EXTERNAL://kafka1:39092,CONTROLLER://kafka1:9093,CONNECTIONS_FROM_HOST://0.0.0.0:49092
      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka1:19092,EXTERNAL://kafka1:39092,CONNECTIONS_FROM_HOST://localhost:49092
      KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_PROCESS_ROLES: 'controller,broker'
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
      KAFKA_METADATA_LOG_SEGMENT_MS: 15000
      KAFKA_METADATA_MAX_RETENTION_MS: 1200000
      KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
    volumes:
      - kafka1-data:/var/lib/kafka
      - ./scripts/update_run.sh:/tmp/update_run.sh
      - ./clusterID:/tmp/clusterID
    command: "bash -c '/tmp/update_run.sh && /etc/confluent/docker/run'"

  kafka2:
    image: confluentinc/cp-kafka:7.3.3
    hostname: kafka2
    container_name: kafka2
    ports:
      - "39093:39093"
    environment:
      KAFKA_LISTENERS: BROKER://kafka2:19093,EXTERNAL://kafka2:39093,CONTROLLER://kafka2:9093
      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka2:19093,EXTERNAL://kafka2:39093
      KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_PROCESS_ROLES: 'controller,broker'
      KAFKA_NODE_ID: 2
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
      KAFKA_METADATA_LOG_SEGMENT_MS: 15000
      KAFKA_METADATA_MAX_RETENTION_MS: 1200000
      KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
    volumes:
      - kafka2-data:/var/lib/kafka
      - ./scripts/update_run.sh:/tmp/update_run.sh
      - ./clusterID:/tmp/clusterID
    command: "bash -c '/tmp/update_run.sh && /etc/confluent/docker/run'"

  kafka3:
    image: confluentinc/cp-kafka:7.3.3
    hostname: kafka3
    container_name: kafka3
    ports:
      - "39094:39094"
    environment:
      KAFKA_LISTENERS: BROKER://kafka3:19094,EXTERNAL://kafka3:39094,CONTROLLER://kafka3:9093
      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka3:19094,EXTERNAL://kafka3:39094
      KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_PROCESS_ROLES: 'controller,broker'
      KAFKA_NODE_ID: 3
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
      KAFKA_METADATA_LOG_SEGMENT_MS: 15000
      KAFKA_METADATA_MAX_RETENTION_MS: 1200000
      KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
    volumes:
      - kafka3-data:/var/lib/kafka
      - ./scripts/update_run.sh:/tmp/update_run.sh
      - ./clusterID:/tmp/clusterID
    command: "bash -c '/tmp/update_run.sh && /etc/confluent/docker/run'"

  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8080:8080
    environment:
      DYNAMIC_CONFIG_ENABLED: 'true'


volumes:
  kafka1-data:
  kafka2-data:
  kafka3-data:

python code

from confluent_kafka import Producer, Consumer, KafkaError, KafkaException


def Consume():
    print('\n<Consuming>')
    c = Consumer({
        'bootstrap.servers':  'localhost:49092',
        'group.id': 'rmoff'
    })

    c.subscribe(["test"])
    try:
        msgs = c.consume(num_messages=1,timeout=30)

        if len(msgs)==0:
            print("❌ No message(s) consumed (maybe we timed out waiting?)\n")
        else:
            for msg in msgs:
                print('✅    Message received:  "{}" from topic {}\n'.format(msg.value().decode('utf-8'),msg.topic()))
    except Exception as e:
        print("❌ Consumer error: {}\n".format(e))
    c.close()



Consume()

I read in the docs about configuration and tried to use it. I tried to use python kafka consumer and could not receive message with timeout error. I have no idea how to fix this configuration. At the same time I can produce message from kafka-ui. I guess it works because it has the same network as brokers.

0

There are 0 answers