Unable to connect kafka broker using confluent-kafka using oauth. Also I am getting SSL handshake error in the log. but the callback error I am getting "KafkaError{code=\_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}"
.
from confluent_kafka import Producer
import time
bootstrap_servers="IP Address"
topic = "topic name"
oauth_access_token = 'access token'
def delivery_callback(err, msg):
if err:
print('ERROR: Message failed delivery: {}'.format(err), msg)
else:
print("message produced in topic", msg)
def kafka_producer_with_oauth():
conf = {
'bootstrap.servers': bootstrap_servers,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauthbearer.config': f'oauthbearer_token={oauth_access_token}',
'message.timeout.ms': 10000,
}
p = Producer(conf)
print(p)
for i in range(2):
message = f"Message {i}"
p.produce(topic, message.encode('utf-8'), callback=delivery_callback)
print("message produced.. ", i)
p.poll(0)
p.flush()
kafka_producer_with_oauth()