The following application is a simple consumer which prints all messages to the console.
#!/usr/bin/env python
import confluent_kafka
consumer = confluent_kafka.Consumer({
'bootstrap.servers': 'kafka05-prod01.messagehub.services.us-south.bluemix.net:9093,kafka03-prod01.messagehub.services.us-south.bluemix.net:9093,kafka01-prod01.messagehub.services.us-south.bluemix.net:9093,kafka04-prod01.messagehub.services.us-south.bluemix.net:9093,kafka02-prod01.messagehub.services.us-south.bluemix.net:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': 'XXX',
'sasl.password': 'XXX',
'api.version.request': True,
'client.id': 'consumer01',
'group.id': 'group01',
})
consumer.subscribe(['logs'])
while True:
msg = consumer.poll(1)
if msg is not None and msg.error() is None:
print(msg.value().decode('utf-8'))
In the beginning it works fine. Several hours later, I see the following error messages. Once I restart the script it works fine again.
^C%3|1504028772.615|FAIL|consumer01#consumer-1| [thrd:sasl_ssl://kafka08-prod01.messagehub.services.us-south.bluemix.]: sasl_ssl://kafka08-prod01.messagehub.services.us-south.bluemix.net:9093/7: Failed to initialize SASL authentication: SASL Handshake not supported by broker (required by mechanism PLAIN) %3|1504028772.615|ERROR|consumer01#consumer-1| [thrd:sasl_ssl://kafka08-prod01.messagehub.services.us-south.bluemix.]: sasl_ssl://kafka08-prod01.messagehub.services.us-south.bluemix.net:9093/7: Failed to initialize SASL authentication: SASL Handshake not supported by broker (required by mechanism PLAIN)
There was a Message Hub outage at the times mentioned in your logs, so it's likely to be related to that.