Rx.NET Buffer with delay problem on quotes aggregation

96 views Asked by At

I am using reactive library for 1-second bar aggregation based on symbol quotes (foreign exchange and CFD)

So far without success on IAsyncEnumerable extension, so I decided to replace pull with push strategy where reactive comes into the play with operators. Unfortunately I am not familiar with it and fell into another problems. Anyway I am very close to achieve the final result.

What I want to achieve is to group by symbol quotes based on "TickTime" for the same second. The problem which I am facing is delay between the quote time - TickTime from broker side and received time - ReceivedTime from channel (delay is approx. 10 to 30 ms). I tried with delay operator in combination with buffer (timespan duration) but I guess it isn't the right option. I know I should be using window operator but I do not know how, because reactive approach is something new to me and I am still learning.

Notice:
If no quote appear in 1 second the observable should post the empty list (some kind of timeout) and later updates is not possible in my situation.

Here is my short code

    await Task.Run(() => {
    
    var asset = symbol;

    var observable = streamQuotes.ToObservable()
        .Delay(TimeSpan.FromMilliseconds(15))
        .Buffer(TimeSpan.FromSeconds(1), Scheduler.Default)
        .Subscribe(quotes =>
        {
            var topic = Topic.ToObject($"{provider}-{asset}-{Topic.SECOND}-1");

            if (DebugMode)
                Logger.LogInformation(
                    $"{topic} {JsonConvert.SerializeObject(quotes, Formatting.Indented)}");

            //_ = AggregateAsync(topic, quotes, epochBegin, epochStep++), cancellationToken);
        });

    cancellationToken.Register(() => observable.Dispose());
    Subscriptions.Add(observable);

}, cancellationToken);

which produces this output:

  {
    "Id": 142,
    "Bid": 45282.72,
    "Ask": 45341.63,
    "Symbol": "BTCUSD",
    "Time": 1707431630102,
    "TickTime": "2024-02-08T22:33:50.102+00:00",
    "ReceivedTime": "2024-02-08T22:33:50.1187799+00:00"
  },
  {
    "Id": 142,
    "Bid": 45283.05,
    "Ask": 45341.96,
    "Symbol": "BTCUSD",
    "Time": 1707431630552,
    "TickTime": "2024-02-08T22:33:50.552+00:00",
    "ReceivedTime": "2024-02-08T22:33:50.5654973+00:00"
  },
  {
    "Id": 142,
    "Bid": 45283.86,
    "Ask": 45342.77,
    "Symbol": "BTCUSD",
    "Time": 1707431630591,
    "TickTime": "2024-02-08T22:33:50.591+00:00",
    "ReceivedTime": "2024-02-08T22:33:50.6046997+00:00"
  },
  {
    "Id": 142,
    "Bid": 45285.07,
    "Ask": 45343.98,
    "Symbol": "BTCUSD",
    "Time": 1707431630702,
    "TickTime": "2024-02-08T22:33:50.702+00:00",
    "ReceivedTime": "2024-02-08T22:33:50.7155601+00:00"
  },
  {
   "Id": 142,
   "Bid": 45284.73,
   "Ask": 45343.64,
   "Symbol": "BTCUSD",
   "Time": 1707431630753,
   "TickTime": "2024-02-08T22:33:50.753+00:00",
   "ReceivedTime": "2024-02-08T22:33:50.7658716+00:00"
  },
  {
    "Id": 142,
    "Bid": 45284.45,
    "Ask": 45343.36,
    "Symbol": "BTCUSD",
    "Time": 1707431630853,
    "TickTime": "2024-02-08T22:33:50.853+00:00",
    "ReceivedTime": "2024-02-08T22:33:50.866821+00:00"
  },
  {
    "Id": 142,
    "Bid": 45284.54,
    "Ask": 45343.46,
    "Symbol": "BTCUSD",
    "Time": 1707431631003,
    "TickTime": "2024-02-08T22:33:50.999+00:00",
    "ReceivedTime": "2024-02-08T22:33:51.005728+00:00"
  },
  {
    "Id": 142,
    "Bid": 45284.64,
    "Ask": 45343.55,
    "Symbol": "BTCUSD",
    "Time": 1707431631040,
    "TickTime": "2024-02-08T22:33:51.03+00:00",
    "ReceivedTime": "2024-02-08T22:33:51.0530009+00:00"
  },
  {
    "Id": 142,
    "Bid": 45284.64,
    "Ask": 45343.55,
    "Symbol": "BTCUSD",
    "Time": 1707431631040,
    "TickTime": "2024-02-08T22:33:51.05+00:00",
    "ReceivedTime": "2024-02-08T22:33:51.056112+00:00"
  }

The output is wrong because of last 2 objects in the list, which belongs to the next second.

Please help me to solve this issue or at least point me to the right direction.

2

There are 2 answers

5
Oleg Dok On

Since the Delay and Buffer rely on a system timer that has an average accuracy of 15ms, your buffering will always slide from second to second as it goes.

You need to rely on absolute event times. Please see my example below which does not rely on timers, but only on absolute times.

static async Task Main()
{
    var rnd = new Random();

    var quotes = Observable
                .Interval(TimeSpan.FromMilliseconds(300))
                .Select(_ => rnd.NextDouble());

    var sub = quotes
                .Timestamp()
                .Do(x => Console.WriteLine($"{x.Timestamp:mm:ss.fff}: {x.Value:N}"))
                .GroupByUntil(
                    q => q.Timestamp.DateTime.AddSeconds(1).ToString("s"), 
                    g => Observable.Return(Unit.Default).DelaySubscription(DateTime.Parse(g.Key, null, DateTimeStyles.AssumeUniversal)))
                .Select(group => group.ToList())
                .Concat()
                .Subscribe(list =>
                {
                    if(!list.Any()) return;

                    Console.WriteLine($"{list.First().Timestamp:HH:mm:ss.fff}-{list.Last().Timestamp:ss:fff}: Count: {list.Count}, Open: {list.First().Value:N}, High: {list.Max(x=>x.Value):N}, Low: {list.Min(x=>x.Value):N}, Close: {list.Last().Value:N}");
                });

    Console.ReadLine();     
    sub.Dispose();              
}

Feel free to ask if need explainations.

Update (relying only on "server" timestamp):

static async Task Main()
{
    var rnd = new Random();

    var quotes = Observable
                .Interval(TimeSpan.FromMilliseconds(125))
                .Select(_ => rnd.NextDouble())
                .Timestamp(); // "server" timestamp

    using var groupLifetimeSink = new Subject<IObservable<int>>();
    
    using var sub = quotes
                .Do(x => Console.WriteLine($"{x.Timestamp:mm:ss.fff}: {x.Value:N}"))
                .GroupByUntil(
                    q => 
                    {
                        var leftMilliseconds = 1000 - q.Timestamp.DateTime.Millisecond;
                        groupLifetimeSink.OnNext(
                        Observable
                            .Return(q.Timestamp.DateTime.Second)
                            .DelaySubscription(TimeSpan.FromMilliseconds(leftMilliseconds))
                        );
                        return q.Timestamp.DateTime.Second;
                    },
                    g => 
                    {
                        groupLifetimeSink.OnNext(Observable.Return(g.Key));
                        return groupLifetimeSink.Merge().DistinctUntilChanged();
                    })
                .Select(group => group.ToList())
                .Concat()
                .Subscribe(list =>
                {
                    if(!list.Any()) return;

                    Console.WriteLine($"{list.First().Timestamp:HH:mm:ss.fff}-{list.Last().Timestamp:ss:fff}: Count: {list.Count}, Open: {list.First().Value:N}, High: {list.Max(x=>x.Value):N}, Low: {list.Min(x=>x.Value):N}, Close: {list.Last().Value:N}");
                });

    Console.ReadLine();                     
}

One more variant, without grouping, but pure streaming

static async Task Main()
{
    var rnd = new Random();

    var quotes = Observable
                .Interval(TimeSpan.FromMilliseconds(125))
                .Select(_ => rnd.NextDouble())
                .Timestamp(); // "server" timestamp

    using var bufferBoundary = new Subject<Unit>();

    quotes
        .Do(x => Console.WriteLine($"{x.Timestamp:mm:ss.fff}: {x.Value:N}"))
        .Scan((previous, current) =>
        {
            if (previous.Timestamp.ToString("s") != current.Timestamp.ToString("s")) bufferBoundary.OnNext(Unit.Default);
            return current;
        })
        .Buffer(bufferBoundary.Synchronize())
        .Subscribe(list =>
        {
            if (!list.Any()) return;

            Console.WriteLine($"{list.First().Timestamp:HH:mm:ss.fff}-{list.Last().Timestamp:ss:fff}: Count: {list.Count}, Open: {list.First().Value:N}, High: {list.Max(x => x.Value):N}, Low: {list.Min(x => x.Value):N}, Close: {list.Last().Value:N}");
        });

    Console.ReadLine();
}
10
Shlomo On

This should probably solve it for you, or at least lead you down the right path:

var observable = streamQuotes.ToObservable()
    .GroupBy(q => q.TickTime.Ticks / TimeSpan.TicksPerSecond) 
    .SelectMany(g => g.Buffer(TimeSpan.FromSeconds(1), Scheduler.Default).Take(2))
    .Subscribe(quotes =>
    {
        Console.Write(JsonConvert.SerializeObject(quotes, Newtonsoft.Json.Formatting.Indented));

    });

A couple problems with your code:

  1. It looks like you want to buffer / group by the seconds value of your TickTime property. The Buffer operator (and the Window operator as well) you're invoking will buffer by seconds offsets of whenever your code runs and data arrives. It doesn't look at your data at all.
  2. You can use Buffer if you monkey with the window openings, but that's more trouble than it's worth.

The GroupBy in the solution first groups the data by the tick-based second value. The Buffer than takes those and forms it into a list. The Take(2) ensures that each window created by the GroupBy stays open for 2 seconds max to prevent memory leaks.