I have streaming data coming from eventhub as follows:
product price quantity
1 55 100
2 44 200
2 43 200
1 60 300
2 55 100
2 44 50
2 47 100
1 44 100
1 48 100
Now I want to calculate the volume weighted average price grouped by product but always only of the last 500 units of traded volume (not of all values).
Calculating based on a time windows is straight forward but I want to only aggregate over the last 500 units of traded volume instead.
So I am looking for some kind of windowing function without time constraint. Is that existing or even possible with stream analytics?
The formula for weighted average is:
Since you have timestamp, you can use that field to compute the weighted average of price based on quantity column for each product with a window frame that includes all rows within the last 500 seconds. Below is the query
Query:
In this query, CTE
stgqueryselects theproduct, the cumulative sum of thepricecolumn for each group ofproductvalues, and the sum of the product of thepriceandquantitycolumns for each group ofproductvalues ( using theSUMfunction with a window frame that includes all rows within the last 500 seconds. This can be changed as per requirement). TheSELECTstatement selects theproductcolumn and calculates the weighted average of thepricecolumn based on thequantitycolumn for each group ofproductvalues. The weighted average is calculated by dividing the sum of the product of thepriceandquantitycolumns (stored in thenumcolumn of theStgqueryCTE) by the cumulative sum of thepricecolumn (stored in thedenomcolumn of theStgqueryCTE). The result is returned as a new column namedweighted_average.Output: