def kafkaa(self, auto_offset_reset, timeout=500):
group_name = "group name"
config = {"bootstrap.servers": "server",
"schema.registry.url": "url",
"group.id": group_name,
"enable.auto.commit": False,
"auto.offset.reset": False, // True
"sasl.mechanisms": "sasl",
"security.protocol": "protocol",
"sasl.username": "username",
"sasl.password": "pw"}
consumer = AvroConsumer(config)
data_consumed = []
consumer.subscribe(kafkaTopic)
while True:
if time.time() > time.time() + timeout:
break
else:
message = consumer.poll()
if message is not None:
kafka_ms.append(message)
consumer.commit(asynchronous=False)
consumer.close()
return data_consumed
`
While using auto.offset.reset = latest, with a group id not used, this does not return any value as the stream does not always have message to consume.
While using auto.offset.reset = latest, with a group id an existing group id, this return everything after offset until timeout but the broker restart
You'll need to use
offsets_for_time
function to lookup offsets of any topic, for that timestamp, then seek the consumer to those partition offsets before you can begin polling from that timestampThe values of auto.offset.reset and group.id don't matter unless you want to commit offsets, which isn't a requirement
Get the topic high watermarks, then subtract one and seek to those offsets to get the very last message sent before you started your consumer (may not actually be the last, when producers could be sending thousands of events per second)