Processing batched events with RX Observables in Hosted Service

39 views Asked by At

I have the following scenario:

public class MyHostedService : BackgroundService
{
    private readonly IServiceProvider _serviceProvider;
    private readonly IEventSource _eventSource;
    private readonly ILogger _logger;

    private IDisposable? _subscription;

    public MyHostedService(
        IServiceProvider serviceProvider,
        IEventSource eventSource,
        ILogger<MyHostedService> logger)
    {
        _serviceProvider = serviceProvider;
        _eventSource = eventSource;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _subscription = _eventSource.CreateEventsObservable()
            .Buffer(TimeSpan.FromSeconds(1), 100)
            .Where(batch => batch.Count is not 0)
            .Select((pes, batchNumber) => Observable.FromAsync(() => ProcessBatch(batchNumber, pes, stoppingToken)))
            .Concat()
            .Subscribe(
                onNext: (u) => { },
                onError: (e) => _logger.LogSubscriptionFailure(e),
                onCompleted: () => _logger.LogCompletedSubscription()
            );
    }

    private async Task ProcessBatch(
        int batchNumber, 
        IList<MyEvent> events,
         CancellationToken cancellationToken)
    {
        try
        {
            using var scope = _serviceProvider.CreateScope();
            var processor = scope.ServiceProvider.GetRequiredService<IProcessor>();
            await processor.Process(events, cancellationToken);

            _logger.LogProcessedBatch(batchNumber, events.Count);
        }
        catch (Exception e) when (e is not OperationCanceledException)
        {
            _logger.LogBatchError(e, batchNumber, events.Count);
        }
    }

    public override async Task StopAsync(CancellationToken stoppingToken) => await base.StopAsync(stoppingToken);

    public override void Dispose()
    {
        _subscription?.Dispose();
        base.Dispose();
    }
}

There are a few things that bother me:

  1. I noticed that the processing is a bit slow - even though the source observable (_eventSource) produces lots of events, the batches don't come in too fast. I wonder if I could improve the throughput here. Maybe ProcessBatch should use Task.Run internally to run the processor?
  2. There are lots of events. Since IProcessor has some transient dependencies (like types http clients), I think I shouldn't just inject IProcessor and use that instance for the whole lifetime of the app. Instead I'm creating a scope everytime a batch is being processed. Is it OK for performance? Could it be better?
1

There are 1 answers

2
Enigmativity On

It's always best to have a full Rx solution. Calling Subscribe in your ExecuteAsync is not the best way to go.

You can await an observable, you know.

protected override async Task ExecuteAsync(CancellationToken stoppingToken) =>
    await
        Observable
            .Using(
                () => _serviceProvider.CreateScope(),
                scope =>
                    from es in _eventSource.CreateEventsObservable().Buffer(TimeSpan.FromSeconds(1), 100)
                    where es.Count > 0
                    let processor = scope.ServiceProvider.GetRequiredService<IProcessor>()
                    from r in Observable.FromAsync(ct => processor.Process(es, ct))
                    select r)
            .Do(u => { }, (e) => _logger.LogSubscriptionFailure(e), () => _logger.LogCompletedSubscription())
            .LastAsync()
            .TakeUntil(stoppingToken.ToObservable());

That's close. Not quite your original query, but it should do the same processing and it's only creating one scope.

ou need this extension method:

public static class ObservableEx2
{
    public static IObservable<Unit> ToObservable(this CancellationToken ct) =>
        Observable.Create<Unit>(observer => ct.Register(() =>
        {
            observer.OnNext(Unit.Default);
            observer.OnCompleted();
        }));
}