How to consume the last message on a kafka message or consume a message based on timestamp?

64 views Asked by At
    def kafkaa(self, auto_offset_reset, timeout=500):
        group_name = "group name"
        config = {"bootstrap.servers": "server",
                           "schema.registry.url": "url",
                           "group.id": group_name,
                           "enable.auto.commit": False,
                           "auto.offset.reset": False, // True
                           "sasl.mechanisms": "sasl",
                           "security.protocol": "protocol",
                           "sasl.username": "username",
                           "sasl.password": "pw"}
        consumer = AvroConsumer(config)
        data_consumed = []
        consumer.subscribe(kafkaTopic)
        while True:
            if time.time() > time.time() + timeout:
            break
            else:
            message = consumer.poll()
            if message is not None:
                kafka_ms.append(message)
                consumer.commit(asynchronous=False)
        consumer.close()
        return data_consumed
    `

While using auto.offset.reset = latest, with a group id not used, this does not return any value as the stream does not always have message to consume.

While using auto.offset.reset = latest, with a group id an existing group id, this return everything after offset until timeout but the broker restart

1

There are 1 answers

0
OneCricketeer On

based on timestamp

You'll need to use offsets_for_time function to lookup offsets of any topic, for that timestamp, then seek the consumer to those partition offsets before you can begin polling from that timestamp

The values of auto.offset.reset and group.id don't matter unless you want to commit offsets, which isn't a requirement

Get the topic high watermarks, then subtract one and seek to those offsets to get the very last message sent before you started your consumer (may not actually be the last, when producers could be sending thousands of events per second)