I'm using the kafka python client to push messages to Message Hub, but noticed that after a while of running my app that it would stop sending messages to Message Hub.
I then noticed the following in my log files:
ConnectionError: socket disconnected
I updated my code to add retries=5
:
from kafka import KafkaProducer
from kafka.errors import KafkaError
import ssl
sasl_mechanism = 'PLAIN'
security_protocol = 'SASL_SSL'
# Create a new context using system defaults, disable all but TLS1.2
context = ssl.create_default_context()
context.options &= ssl.OP_NO_TLSv1
context.options &= ssl.OP_NO_TLSv1_1
producer = KafkaProducer(bootstrap_servers = app.config['KAFKA_BROKERS_SASL'],
sasl_plain_username = app.config['KAFKA_USERNAME'],
sasl_plain_password = app.config['KAFKA_PASSWORD'],
security_protocol = security_protocol,
ssl_context = context,
sasl_mechanism = sasl_mechanism,
api_version = (0,10),
retries=5)
def send_message(message):
try:
producer.send(app.config['KAFKA_TOPIC'], message.encode('utf-8'))
# FIXME sending seems to be unreliable unless we flush
producer.flush()
except:
print("Unexpected error:", sys.exc_info()[0])
raise
My code runs from a python flask app. Every request of a certain type that comes in calls the send_message()
method.
Here are the relevant log lines from Bluemix. I may have missed a line or two copying and pasting, but hopefully it is enough to figure out what is happening:
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxx May 15, 2017 8:50:54 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: socket disconnected May 15, 2017 8:50:54 PM
APP/1 Got error produce response on topic-partition TopicPartition(topic='movie_ratings', partition=0), retrying (4 attempts left). Error: ConnectionError: socket disconnectedMay 15, 2017 8:50:54 PM
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:54 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxxMay 15, 2017 8:50:54 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: socket disconnected May 15, 2017 8:50:54 PM
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:54 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxx May 15, 2017 8:50:54 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: socket disconnected May 15, 2017 8:50:55 PM
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:55 PM
APP/1 Got error produce response on topic-partition TopicPartition(topic='movie_ratings', partition=0), retrying (2 attempts left). Error: ConnectionError: socket disconnected May 15, 2017 8:50:55 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxx May 15, 2017 8:50:55 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: socket disconnected May 15, 2017 8:50:55 PM
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:55 PM
APP/1 Got error produce response on topic-partition TopicPartition(topic='movie_ratings', partition=0), retrying (1 attempts left). Error: ConnectionError: socket disconnected May 15, 2017 8:50:55 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxx May 15, 2017 8:50:55 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: socket disconnected May 15, 2017 8:50:55 PM
APP/1 Got error produce response on topic-partition TopicPartition(topic='movie_ratings', partition=0), retrying (0 attempts left). Error: ConnectionError: socket disconnected May 15, 2017 8:50:55 PM
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:55 PM
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxx May 15, 2017 8:50:55 PM
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:56 PM
APP/1 ConnectionError: socket disconnected May 15, 2017 8:50:56 PM
APP/1 Unable to import 'sasl'. Fallback to 'puresasl'. May 15, 2017 8:51:00 PM
APP/1 Closing active operation May 15, 2017 8:51:01 PM
My topic exists. My full client code is here: https://github.com/snowch/movie-recommender-demo/blob/effc981cc9f799c41952719619f693172eebcd6a/web_app/app/messagehub_client.py
Any pointers most appreciated ...
As per Dominic's comment, upgrading kafka python to 1.3.3 fixed the issue for me.