I am using reactive library for 1-second bar aggregation based on symbol quotes (foreign exchange and CFD)
So far without success on IAsyncEnumerable extension, so I decided to replace pull with push strategy where reactive comes into the play with operators. Unfortunately I am not familiar with it and fell into another problems. Anyway I am very close to achieve the final result.
What I want to achieve is to group by symbol quotes based on "TickTime" for the same second. The problem which I am facing is delay between the quote time - TickTime from broker side and received time - ReceivedTime from channel (delay is approx. 10 to 30 ms). I tried with delay operator in combination with buffer (timespan duration) but I guess it isn't the right option. I know I should be using window operator but I do not know how, because reactive approach is something new to me and I am still learning.
Notice:
If no quote appear in 1 second the observable should post the empty list (some kind of timeout) and later updates is not possible in my situation.
Here is my short code
await Task.Run(() => {
var asset = symbol;
var observable = streamQuotes.ToObservable()
.Delay(TimeSpan.FromMilliseconds(15))
.Buffer(TimeSpan.FromSeconds(1), Scheduler.Default)
.Subscribe(quotes =>
{
var topic = Topic.ToObject($"{provider}-{asset}-{Topic.SECOND}-1");
if (DebugMode)
Logger.LogInformation(
$"{topic} {JsonConvert.SerializeObject(quotes, Formatting.Indented)}");
//_ = AggregateAsync(topic, quotes, epochBegin, epochStep++), cancellationToken);
});
cancellationToken.Register(() => observable.Dispose());
Subscriptions.Add(observable);
}, cancellationToken);
which produces this output:
{
"Id": 142,
"Bid": 45282.72,
"Ask": 45341.63,
"Symbol": "BTCUSD",
"Time": 1707431630102,
"TickTime": "2024-02-08T22:33:50.102+00:00",
"ReceivedTime": "2024-02-08T22:33:50.1187799+00:00"
},
{
"Id": 142,
"Bid": 45283.05,
"Ask": 45341.96,
"Symbol": "BTCUSD",
"Time": 1707431630552,
"TickTime": "2024-02-08T22:33:50.552+00:00",
"ReceivedTime": "2024-02-08T22:33:50.5654973+00:00"
},
{
"Id": 142,
"Bid": 45283.86,
"Ask": 45342.77,
"Symbol": "BTCUSD",
"Time": 1707431630591,
"TickTime": "2024-02-08T22:33:50.591+00:00",
"ReceivedTime": "2024-02-08T22:33:50.6046997+00:00"
},
{
"Id": 142,
"Bid": 45285.07,
"Ask": 45343.98,
"Symbol": "BTCUSD",
"Time": 1707431630702,
"TickTime": "2024-02-08T22:33:50.702+00:00",
"ReceivedTime": "2024-02-08T22:33:50.7155601+00:00"
},
{
"Id": 142,
"Bid": 45284.73,
"Ask": 45343.64,
"Symbol": "BTCUSD",
"Time": 1707431630753,
"TickTime": "2024-02-08T22:33:50.753+00:00",
"ReceivedTime": "2024-02-08T22:33:50.7658716+00:00"
},
{
"Id": 142,
"Bid": 45284.45,
"Ask": 45343.36,
"Symbol": "BTCUSD",
"Time": 1707431630853,
"TickTime": "2024-02-08T22:33:50.853+00:00",
"ReceivedTime": "2024-02-08T22:33:50.866821+00:00"
},
{
"Id": 142,
"Bid": 45284.54,
"Ask": 45343.46,
"Symbol": "BTCUSD",
"Time": 1707431631003,
"TickTime": "2024-02-08T22:33:50.999+00:00",
"ReceivedTime": "2024-02-08T22:33:51.005728+00:00"
},
{
"Id": 142,
"Bid": 45284.64,
"Ask": 45343.55,
"Symbol": "BTCUSD",
"Time": 1707431631040,
"TickTime": "2024-02-08T22:33:51.03+00:00",
"ReceivedTime": "2024-02-08T22:33:51.0530009+00:00"
},
{
"Id": 142,
"Bid": 45284.64,
"Ask": 45343.55,
"Symbol": "BTCUSD",
"Time": 1707431631040,
"TickTime": "2024-02-08T22:33:51.05+00:00",
"ReceivedTime": "2024-02-08T22:33:51.056112+00:00"
}
The output is wrong because of last 2 objects in the list, which belongs to the next second.
Please help me to solve this issue or at least point me to the right direction.
Since the Delay and Buffer rely on a system timer that has an average accuracy of 15ms, your buffering will always slide from second to second as it goes.
You need to rely on absolute event times. Please see my example below which does not rely on timers, but only on absolute times.
Feel free to ask if need explainations.
Update (relying only on "server" timestamp):
One more variant, without grouping, but pure streaming