please consider the following script:
from confluent_kafka import Consumer
import logging
import time
logger = logging.getLogger()
logger.addHandler(logging.StreamHandler())
consumer_config = {
"bootstrap.servers": SERVER_ADDRESS,
"group.id": GROUP_ID,
"enable.auto.commit": "false",
'auto.offset.reset': 'earliest',
'sasl.kerberos.kinit.cmd': f"kinit -VVV -k -t {KEYTAB_FILE} {PRINCIPAL}",
'security.protocol': 'sasl_plaintext',
'sasl.mechanism': 'GSSAPI',
'sasl.kerberos.service.name': 'kafka',
'sasl.kerberos.keytab': KEYTAB_FILE,
'sasl.kerberos.principal': PRINCIPAL
}
consumer = Consumer(consumer_config, logger=logger)
try:
print("Waiting five seconds...")
time.sleep(5)
except KeyboardInterrupt:
print("KeyboardInterrupt")
consumer.close()
This script it's not doing anything by purpose. I can' understand why, if the row:
consumer = Consumer(consumer_config, logger=logger)
Is commented, then the KeyboardInterrupt (e.g. pressing Ctrl+C within the first five seconds the script is running) works properly, while if the consumer is initialised, the KeyboardInterrupt is not intercepted.