My AvroConsumer
from module confluent_kafka.avro
always raise 'dict' object has no attribute 'get_by_id'
when polling.
Although, when I poll with a simple Consumer
from confluent_kafka
I get the binary serialized.
The ccloud CLI also works perfectly fine to consume the Kafka.
Any idea why the confluent_kafka
client does not work? Is it because of my configuration?
I use confluent-kafka==1.5.0
.
Here is a sample of my python code:
from confluent_kafka.avro import AvroConsumer
conf = {
'bootstrap.servers': MY_BT_SERVERS,
'sasl.mechanisms': "PLAIN",
'security.protocol': "SASL_SSL",
'sasl.username': API_KEY,
'sasl.password': API_PASSWORD,
'group.id': 'group_id',
'auto.offset.reset': 'earliest'
}
schema_registry_conf = {
'url': SR_ENDPOINT,
'basic.auth.user.info': "USER_INFO",
'schema.registry.basic.auth.user.info': f"{SR_API_KEY}:{SR_API_SECRET}"
}
consumer = AvroConsumer(config=conf, schema_registry=schema_registry_conf)
consumer.subscribe(["my-topic"])
message = consumer.poll(5)
That raises:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-22-95673a1ff746> in <module>
----> message = consumer.poll(5)
lib/python3.7/site-packages/confluent_kafka/avro/__init__.py in poll(self, timeout)
164 try:
165 if message.value() is not None:
--> 166 decoded_value = self._serializer.decode_message(message.value(), is_key=False)
167 message.set_value(decoded_value)
168 if message.key() is not None:
/lib/python3.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py in decode_message(self, message, is_key)
229 if magic != MAGIC_BYTE:
230 raise SerializerError("message does not start with magic byte")
--> 231 decoder_func = self._get_decoder_func(schema_id, payload, is_key)
232 return decoder_func(payload)
/lib/python3.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py in _get_decoder_func(self, schema_id, payload, is_key)
161 # fetch writer schema from schema reg
162 try:
--> 163 writer_schema_obj = self.registry_client.get_by_id(schema_id)
164 except ClientError as e:
165 raise SerializerError("unable to fetch schema with id %d: %s" % (schema_id, str(e)))
AttributeError: 'dict' object has no attribute 'get_by_id'
For a clue, I also want to precise that all the serialized messages I poll start with strange \x00\x00\x01\x86\xa1\
bytes I have to get rid of when I manually deserialized my data.
Thanks for any help!
Your error is here -
schema_registry=schema_registry_conf
You passed a dictionary, when you should be passing an instance of the registry client
Producer example