Why does Buffer from my Observable not receive events?

65 views Asked by At

I have tried several ways but failing to get my subscription method called:

  • Method 1:

    var buffer = new List<Kpi>();
    buffer.ToObservable().Buffer(TimeSpan.FromMinutes(1), 5).Subscribe(
    async kpis =>
    {
        await _retry.ExecuteAsync(() => Process(kpis.ToList())).ConfigureAwait(false);
    });
    

    Then buffer.Add(new Kpi()); won't trigger my method.


  • Method 2: (Note: I have read the definition for the special methods Empty/Never/Throw but other than these I can't seem to find a way to create an observable that emits something other than primitive numbers etc.)

    var buffer = Observable.Empty<Kpi>();
    buffer.Buffer(TimeSpan.FromMinutes(1), 5).Subscribe(
    async kpis =>
    {
        await _retry.ExecuteAsync(() => Process(kpis.ToList())).ConfigureAwait(false);
    });
    

    Then buffer.Publish(new Kpi()) . Again nothing happens

Where am I am going wrong ?

1

There are 1 answers

5
Euphoric On BEST ANSWER

In the first case, calling ToObservable on List won't make the List magically notify of it's changes. List simply does not have that feature.

In the second case, Publish does something completely different than what you are expecting.

If you want to create an observable from events, you are looking for Subject class.

var buffer = new Subject<Kpi>();
buffer.Buffer(TimeSpan.FromMinutes(1), 5).Subscribe(
async kpis =>
{
    await _retry.ExecuteAsync(() => Process(kpis.ToList())).ConfigureAwait(false);
});

// notify of new item
buffer.OnNext(new Kpi());

There are many ways to create new observable sequence. I would recommend read through it to see if one is more suited for you. For example turning event into observable.