I'm using php-rdkafka library to write a Kafka consumer for API project. I need to find the last offset in topic and get value from it for further processing. For example, the last offset in topic = 5, then I need to get offset 5 and send it through API until a new offset is added. What I'm trying to run:
$conf = new RdKafka\Conf();
$settings = [
'socket.keepalive.enable' => true,
'log_level' => LOG_WARNING,
'enable.auto.offset.store' => 'true',
'auto.offset.reset' => 'earliest',
'enable.partition.eof' => 'false',
'enable.auto.commit' => 'false',
'max.poll.interval.ms' => 300000,
'session.timeout.ms' => 45000,
'group.id' => 'test-group',
'group.instance.id' => uniqid('', true),
'metadata.broker.list' => 'stat-kafka-1:9092,stat-kafka-2:9092,stat-kafka-3:9092',
];
foreach ($settings as $key => $value) {
$conf->set($key, $value);
}
$topicName = 'userstatistics_12345';
$partition = 0;
$topicPartition = new RdKafka\TopicPartition($topicName, $partition);
$topicPartitionsWithOffsets = $consumer->getOffsetPositions([$topicPartition]);
var_dump($topicPartitionsWithOffsets);
but this returns weird result with negative offset
array(1) { [0]=> object(RdKafka\TopicPartition)#6 (4) { ["topic"]=> string(20) "userstatistics_12345" ["partition"]=> int(0) ["offset"]=> int(-1001) ["err"]=> int(0) } }
Although in fact the current last offset is 59. My idea is to get the last offset number and then get the value using:
$consumer->assign([
new RdKafka\TopicPartition($topicName, $partition, $lastOffset)
]);
I also don't want to use while(true) loop for fast script work.
That's all. Thanks.
I found answer, works fine for me: