.net core AsyncLocal loses context with System.Reactive

525 views Asked by At

I want to use AsyncLocal to pass information through async workflows for tracing purpose. Now i faced a problem with RX.
Thios is my test code:

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;

public class RxTest
{
    private readonly Subject<int> test = new Subject<int>();

    private readonly AsyncLocal<int> asyncContext = new AsyncLocal<int>();

    public void Test()
    {
        this.test
             // .ObserveOn(Scheduler.Default)
            .Subscribe(this.OnNextNormal);
        this.test
             // .ObserveOn(Scheduler.Default)
            .Delay(TimeSpan.FromMilliseconds(1))
            .Subscribe(this.OnNextDelayed);

        for (var i = 0; i < 2; i++)
        {
            var index = i;
            Task.Run(() =>
            {
                this.asyncContext.Value = index;
                Console.WriteLine(
                    $"Main\t\t{index} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
                this.test.OnNext(index);
            });
        }

        Console.ReadKey();
    }

    private void OnNextNormal(int obj)
    {
        Console.WriteLine(
            $"OnNextNormal\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
    }

    private void OnNextDelayed(int obj)
    {
        Console.WriteLine(
            $"OnNextDelayed\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
    }
}

Output is:

Main 0 (Thread: 5): AsyncLocal.Value => 0
Main 1 (Thread: 6): AsyncLocal.Value => 1
OnNextNormal 0 (Thread: 5): AsyncLocal.Value => 0
OnNextNormal 1 (Thread: 6): AsyncLocal.Value => 1
OnNextDelayed 0 (Thread: 4): AsyncLocal.Value => 0
OnNextDelayed 1 (Thread: 4): AsyncLocal.Value => 0

As you can see, the AsyncLocal.Value does not flow to the delayed subscribed methods.
=> AsyncValue gets lost on delayed track

As far as I understand, a normal Subscribe() uses no scheduler and a Delay() uses a scheduler.
When I use ObserveOn() for both calls the output for both are as following

Main 0 (Thread: 5): AsyncLocal.Value => 0
Main 1 (Thread: 7): AsyncLocal.Value => 1
OnNextNormal 0 (Thread: 9): AsyncLocal.Value => 0
OnNextNormal 1 (Thread: 9): AsyncLocal.Value => 0
OnNextDelayed 0 (Thread: 4): AsyncLocal.Value => 0
OnNextDelayed 1 (Thread: 4): AsyncLocal.Value => 0

=> AsyncValue gets lost on every track

Is there a way how to let the ExecutionContext flow with RX?
I only found this but here is the problem the otherway arround. They solved the issue how the context of observer flows. I want to flow the context of the publisher.

What i want to achieve is this:

  1. Message from "outside" comes to my service
  2. Distribute message within service (RX)
  3. When logging a message, format the log message with MessageId
  4. I do not want to pass the message everwhere

Thanks in advance for you answers.

1

There are 1 answers

0
Asti On

The free flowing execution context in Rx is what makes it great for most multi-threaded scenarios. You could enforce the thread context by getting around the scheduled methods like so:

public static class Extensions
{
    public static IObservable<T> TaskPoolDelay<T>(this IObservable<T> observable, TimeSpan delay)
    {
        return Observable.Create<T>(
            observer => observable.Subscribe(
                onNext: value => Task.Delay(delay).ContinueWith(_ => observer.OnNext(value)),
                onError: observer.OnError,
                onCompleted: observer.OnCompleted
            )
        );
    }
}

Output :

OnNextDelayed   2 (Thread: 6): AsyncLocal.Value => 2
OnNextDelayed   3 (Thread: 10): AsyncLocal.Value => 3
OnNextDelayed   1 (Thread: 7): AsyncLocal.Value => 1
OnNextDelayed   0 (Thread: 5): AsyncLocal.Value => 0

This does carry-forward the context, but it quickly gets complicated for larger queries. I'm not sure if implementing an IScheduler which preserves context when it notifies will work well. If message copying isn't too much overhead, that might be the best fit for Rx.