How to use AsyncLocalStorage for an Observable?

2k views Asked by At

I'd like to use use AsyncLocalStorage in a NestJs Interceptor:

export interface CallHandler<T = any> {
    handle(): Observable<T>;
}
export interface NestInterceptor<T = any, R = any> {
    intercept(context: ExecutionContext, next: CallHandler<T>): Observable<R> | Promise<Observable<R>>;
}

The interceptor function gets a next CallHandler that returns an Observable.

I cannot use run in this case (the run callback will exit immediately before the callHandler.handle() observable has finished):

  intercept(context: ExecutionContext, callHandler: CallHandler): Observable<any> | Promise<Observable<any>> {
    const asyncLocalStorage = new AsyncLocalStorage();
    const myStore = {  some: 'data'};
    return asyncLocalStorage.run(myStore, () => callHandler.handle());
  }

See broken replit-example

The solution I came up with is this:

const localStorage = new AsyncLocalStorage();

export class MyInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, callHandler: CallHandler): Observable<any> | Promise<Observable<any>> {
    const resource = new AsyncResource('AsyncLocalStorage', { requireManualDestroy: true });
    const myStore = { some: 'data' };

    localStorage.enterWith(myStore);
    return callHandler.handle().pipe(
      finalize(() => resource.emitDestroy())
    );
  }
}

See working replit example

This seems to work fine, but I am not sure if this is really correct - and it looks messy and error-prone. So I wonder:

  1. Is this correct at all?
  2. Is there a better/cleaner way to handle this?
4

There are 4 answers

2
TmTron On

here is our current solution to the problem:

  • we create an observable that will simply pass all emissions to the callHandler
  • the important part is, that we subscribe inside the localStorage.run method
const localStorage = new AsyncLocalStorage();

export class MyInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, callHandler: CallHandler): Observable<any> | Promise<Observable<any>> {
    const myStore = { some: 'data' };

    return new Observable((subscriber) => {
      const subscription = localStorage.run(myStore, () => {
        /**
         * - run the handler function in the run callback, so that myStore is set
         * - subscribe to the handler and pass all emissions of the callHandler to our subscriber
         */
        return callHandler.handle().subscribe(subscriber);
      });
      /**
       * return an unsubscribe method
       */
      return () => subscription.unsubscribe();
    });
  }
}

0
ajb On

Below is the solution I came up with. My understanding of the problem is that you need the run function to receive a callback function that will fully encapsulate the execution of the handler, however, the intercept function is expected to return an observable that has not yet been triggered. This means that if you encapsulate the observable itself in the run callback function, it will not have been triggered yet.

My solution, below, is to return a new observable that, when triggered, will be responsible for triggering (i.e. subscribing to) the call handler itself. As a result, the promise we create in the run call can fully encapsulate the handle function and it's async callbacks.

Here is the general functionality in a stand-alone function so that you can see it all together:

intercept(context: ExecutionContext, next: CallHandler<any>): Observable<any> {
    return new Observable((subscribe) => {
        asyncStorage.run({}, () => new Promise(resolve => {
            next.handle().subscribe(
                result => {
                    subscribe.next(result);
                    subscribe.complete();
                    resolve();
                },
                error => {
                    subscribe.error(err);
                    resolve();
                }
            );
        }));
    });
}

Next, I took that concept and integrated it into my interceptor below.

export class RequestContextInterceptor implements NestInterceptor {
    constructor(
        private readonly requestContext: RequestContext,
        private readonly localStorage: AsyncLocalStorage<RequestContextData>
    ) {}

    intercept(context: ExecutionContext, next: CallHandler<any>): Observable<any> {
        const contextData = this.requestContext.buildContextData(context);
        return new Observable((subscribe) => {
            void this.localStorage.run(contextData, () => this.runHandler(next, subscribe));
        });
    }

    private runHandler(next: CallHandler<any>, subscribe: Subscriber<any>): Promise<void> {
        return new Promise<void>((resolve) => {
            next.handle().subscribe(
                (result) => {
                    subscribe.next(result);
                    subscribe.complete();
                    resolve();
                },
                (err) => {
                    subscribe.error(err);
                    resolve();
                }
            );
        });
    }
}

It's worth noting that the Promise that is created during the run call does not have a rejection path. This is intentional. The error is passed on to the observable that is wrapping the promise. This means that the outer observable will still succeed or error depending upon what the inner observable does, however, the promise that wraps the inner observable will always resolve regardless.

0
Isaac Newton On

Here is a solution for cls-hooks:

return new Observable(observer => {
  namespace.runAndReturn(async () => {
    namespace.set("some", "data")   
    next.handle()
        .subscribe(
          res => observer.next(res), 
          error => observer.error(error),
          () => observer.complete()
        )
  })
})
1
Spawnrider On

There was an issue related to this : https://github.com/nestjs/nest/pull/11142.
AsynclocalStorage will be suppported in NestSJ 10.0.0.