I am newbie in using Docker compose and Python. For development purposes I have to configure kafka cluster of 3 brokers using docker-compose. How can i read messages from brokers in docker compose using python application?
docker-compose.yml was created using this
version: '3.5'
services:
kafka-gen:
image: confluentinc/cp-kafka:7.3.3
hostname: kafka-gen
container_name: kafka-gen
volumes:
- ./scripts/create_cluster_id.sh:/tmp/create_cluster_id.sh
- ./clusterID:/tmp/clusterID
command: "bash -c '/tmp/create_cluster_id.sh'"
kafka1:
image: confluentinc/cp-kafka:7.3.3
hostname: kafka1
container_name: kafka1
ports:
- "39092:39092"
- "49092:49092"
environment:
KAFKA_LISTENERS: BROKER://kafka1:19092,EXTERNAL://kafka1:39092,CONTROLLER://kafka1:9093,CONNECTIONS_FROM_HOST://0.0.0.0:49092
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka1:19092,EXTERNAL://kafka1:39092,CONNECTIONS_FROM_HOST://localhost:49092
KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_PROCESS_ROLES: 'controller,broker'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
KAFKA_METADATA_LOG_SEGMENT_MS: 15000
KAFKA_METADATA_MAX_RETENTION_MS: 1200000
KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
volumes:
- kafka1-data:/var/lib/kafka
- ./scripts/update_run.sh:/tmp/update_run.sh
- ./clusterID:/tmp/clusterID
command: "bash -c '/tmp/update_run.sh && /etc/confluent/docker/run'"
kafka2:
image: confluentinc/cp-kafka:7.3.3
hostname: kafka2
container_name: kafka2
ports:
- "39093:39093"
environment:
KAFKA_LISTENERS: BROKER://kafka2:19093,EXTERNAL://kafka2:39093,CONTROLLER://kafka2:9093
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka2:19093,EXTERNAL://kafka2:39093
KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_PROCESS_ROLES: 'controller,broker'
KAFKA_NODE_ID: 2
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
KAFKA_METADATA_LOG_SEGMENT_MS: 15000
KAFKA_METADATA_MAX_RETENTION_MS: 1200000
KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
volumes:
- kafka2-data:/var/lib/kafka
- ./scripts/update_run.sh:/tmp/update_run.sh
- ./clusterID:/tmp/clusterID
command: "bash -c '/tmp/update_run.sh && /etc/confluent/docker/run'"
kafka3:
image: confluentinc/cp-kafka:7.3.3
hostname: kafka3
container_name: kafka3
ports:
- "39094:39094"
environment:
KAFKA_LISTENERS: BROKER://kafka3:19094,EXTERNAL://kafka3:39094,CONTROLLER://kafka3:9093
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka3:19094,EXTERNAL://kafka3:39094
KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_PROCESS_ROLES: 'controller,broker'
KAFKA_NODE_ID: 3
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
KAFKA_METADATA_LOG_SEGMENT_MS: 15000
KAFKA_METADATA_MAX_RETENTION_MS: 1200000
KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
volumes:
- kafka3-data:/var/lib/kafka
- ./scripts/update_run.sh:/tmp/update_run.sh
- ./clusterID:/tmp/clusterID
command: "bash -c '/tmp/update_run.sh && /etc/confluent/docker/run'"
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
environment:
DYNAMIC_CONFIG_ENABLED: 'true'
volumes:
kafka1-data:
kafka2-data:
kafka3-data:
python code
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
def Consume():
print('\n<Consuming>')
c = Consumer({
'bootstrap.servers': 'localhost:49092',
'group.id': 'rmoff'
})
c.subscribe(["test"])
try:
msgs = c.consume(num_messages=1,timeout=30)
if len(msgs)==0:
print("❌ No message(s) consumed (maybe we timed out waiting?)\n")
else:
for msg in msgs:
print('✅ Message received: "{}" from topic {}\n'.format(msg.value().decode('utf-8'),msg.topic()))
except Exception as e:
print("❌ Consumer error: {}\n".format(e))
c.close()
Consume()
I read in the docs about configuration and tried to use it. I tried to use python kafka consumer and could not receive message with timeout error. I have no idea how to fix this configuration. At the same time I can produce message from kafka-ui. I guess it works because it has the same network as brokers.