Error "KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}" coiming in confluent-kafka

217 views Asked by At

Unable to connect kafka broker using confluent-kafka using oauth. Also I am getting SSL handshake error in the log. but the callback error I am getting "KafkaError{code=\_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}".

from confluent_kafka import Producer
import time

bootstrap_servers="IP Address"
topic = "topic name"

oauth_access_token = 'access token'
def delivery_callback(err, msg):
    if err:
        print('ERROR: Message failed delivery: {}'.format(err), msg)
    else:
        print("message produced in topic", msg)

def kafka_producer_with_oauth():
    conf = {
        'bootstrap.servers': bootstrap_servers,
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism': 'OAUTHBEARER',
        'sasl.oauthbearer.config': f'oauthbearer_token={oauth_access_token}',
        'message.timeout.ms': 10000,
    }

    p = Producer(conf)
    print(p)
    for i in range(2):
        message = f"Message {i}"
        p.produce(topic, message.encode('utf-8'), callback=delivery_callback)
        print("message produced.. ", i)

    p.poll(0)
    p.flush()
kafka_producer_with_oauth()
0

There are 0 answers