Python Faust hopping window table

35 views Asked by At

I am new to faust and want to keep track of how many messages I received in the last n seconds and update this each l seconds. I thought the hopping table would be suitable for this purpose.

For testing, I am producing a message each second to my topic and expected the following code (n = 5 and l = 1) should "count" up from one to five and then stay at five if the messages keep coming in:

hopping_table = app.Table("hopping_table", default = int)\
    .hopping(5, 1)


@app.agent(topic)
async def process(stream):
    async for value in stream:
        hopping_table["sum"] += 1
        print(f"sum: {hopping_table['sum'].value()}")

But as a result, I get:

[2024-02-04 22:50:55,092] [20956] [WARNING] sum: 1 
[2024-02-04 22:50:56,094] [20956] [WARNING] sum: 1 
[2024-02-04 22:50:57,096] [20956] [WARNING] sum: 1 
[2024-02-04 22:50:58,098] [20956] [WARNING] sum: 1 
[2024-02-04 22:50:59,099] [20956] [WARNING] sum: 1 
[2024-02-04 22:51:00,101] [20956] [WARNING] sum: 1

Can I achieve want I want to do by using hopping windows or is my understanding in general wrong? Unfortunately, I didn't find too much specific information about faust's sliding windows.

Thanks in advance

0

There are 0 answers