How to update part of a combined Observable manually?

57 views Asked by At

To make things easier to illustrate, I have a combined Observable combinedNotifications$ that I bind to the async pipe. It works nice and well for the first load. HOWEVER. I would like to be able to update notifications$ ONLY as well manually. So far, I never delved too much into observables as the use-cases were even simpler than this one so far.

  combinedNotifications$: Observable<any> = combineLatest([
    this.notifications$,
    this.notificationTypes$
  ]).pipe(
    map((i: any) => ({
      tableData: i[0],
      notificationTypes: i[1]
    })),
    map((f: any) => {
      return ({
        tableData: {
          ...f.tableData, content: f.tableData.content.map((v: any) => ({
            ...v,
            type: f.notificationTypes.eventTypes.find((x: any) => x.value === v.type)?.label,
            cause: f.notificationTypes.causeTypes.find((x: any) => x.value === v.cause)?.label
          }))
        },
        notificationTypes: f.notificationTypes
      })
    }),
    tap((d) => console.log(d)),
    shareReplay(1)
  );

I have a table that I interact with via pagination. Before I created this combined observable, I used to be able to update just the notifications:

retrieveNotificationsPage(queryArgs: Pageable = {}, searchData: any = {}) {
    this._loading$.next(true);
    const actionAuditLogsPage$ = this.apiService
      .retrieveNotificationsView$(queryArgs, { ...searchData })
      .pipe(
        delay(300),
        tap({
          complete: () => {
            this._loading$.next(false);
          },
          error: (err) => {
            this._loading$.next(false);
            throw err;
          }
        }),
        catchError((err) => of({ error: err })),
        // shareReplay()
      );
    return actionAuditLogsPage$;
  }

Which then got called somewhat like this...

 loadRecords(event: LazyLoadEvent) {
    ...
    this.notifications$ = this.retrieveNotificationsPage({
      page: isNaN(selectedPage) ? 0 : selectedPage,
      ...(maxPageRowSize != null ? { size: maxPageRowSize } : {})
    }, this.searchForm.value);
  }

So I thought I could probably do it with merge, but it only seems to be stuck in an infinite loop...

this.combinedNotifications$ = merge(notificationPage$, this.combinedNotifications$);
2

There are 2 answers

1
Shlang On BEST ANSWER

If I understand correctly, you want notificationTypes and notifications to be loaded once on subscribe (via async pipe) and then continue loading notifications on demand.

There are a few ways to do this that differ in detail, here is an example how this can be done:

export class App {
  // this thing we want to load only once on page load
  private notificationTypes$ = this.retrieveNotificationTypes();
  // this thing we want to load once on page load and then on some action
  private notifications$ = new BehaviorSubject<Observable<Notification>>(
    // calling the method does not send a request immediately
    // but returns an observable that sends a request when subscribed
    this.retrieveNotificationsPage()
  );

  combinedNotifications$: Observable<any> = combineLatest({
    // pipe to switchMap "unwraps" an observable,
    // you may want to change it to concatMap or mergeMap depending on requerments
    notifications: this.notifications$.pipe(switchMap(identity)),
    notificationTypes: this.notificationTypes$,
  }).pipe(
    map((value) => JSON.stringify(value)), // Just making it readable in the template
    shareReplay(1) // ensures we subscribe only once
  );

  loadRecords() {
    this.notifications$.next(this.retrieveNotificationsPage());
  }

  private retrieveNotificationsPage(): Observable<Notification> {
    return of({ type: 'default', id: currentNotificationId++ }).pipe(
      delay(1000),
      tap(() => console.log('Notifications loaded'))
    );
  }

  private retrieveNotificationTypes(): Observable<string[]> {
    return of(['default']).pipe(
      delay(2000),
      tap(() => console.log('Types loaded'))
    );
  }
}

The key thing is using a Subject, in my case I used BehaviorSubject because I can put an initial value there. And it is a Subject that emits observables, that then are unwrapped with switchMap(identity) and make the actual calls.

Alternatively, the Subject may emit objects representing parameters for the API calls, so that then they can be transformed as switchMap(params => makeACall(...params)).

Here is a playground with the code from above.

4
Lasithe On

It seems like you want to update this.notifications$ manually and ensure that combinedNotifications$ reflects these changes without affecting the original behavior of combinedNotifications$ when it updates due to changes in notifications$ and notificationTypes$.

To achieve this, you can use the startWith operator to emit an initial value for combinedNotifications$ and then use the merge operator to merge the stream of manual updates with the original combinedNotifications$ stream. Then, you can use the distinctUntilChanged operator to ensure that only distinct values are emitted to avoid infinite loops.

import { merge, of } from 'rxjs';
import { startWith, distinctUntilChanged, switchMap } from 'rxjs/operators';

// Assuming notificationPage$ is the observable returned by retrieveNotificationsPage

// Manual update trigger
const manualUpdate$ = new Subject<void>();

// Observable to handle manual updates
const manualUpdateNotifications$ = manualUpdate$.pipe(
  switchMap(() => this.retrieveNotificationsPage())
);

// Combine manual updates with original notifications$
const updatedNotifications$ = merge(
  manualUpdateNotifications$,
  this.notifications$.pipe(
    // Filter out initial emission
    startWith(null),
    // Distinct until changed to prevent infinite loops
    distinctUntilChanged()
  )
);

// Recreate combinedNotifications$ using the updated notifications$
this.combinedNotifications$ = combineLatest([
  updatedNotifications$,
  this.notificationTypes$
]).pipe(
  map((i: any) => ({
    tableData: i[0],
    notificationTypes: i[1]
  })),
  map((f: any) => ({
    tableData: {
      ...f.tableData,
      content: f.tableData.content.map((v: any) => ({
        ...v,
        type: f.notificationTypes.eventTypes.find((x: any) => x.value === v.type)?.label,
        cause: f.notificationTypes.causeTypes.find((x: any) => x.value === v.cause)?.label
      }))
    },
    notificationTypes: f.notificationTypes
  })),
  tap((d) => console.log(d)),
  shareReplay(1)
);

// Function to trigger manual updates
triggerManualUpdate() {
  manualUpdate$.next();
}

With this setup, this.combinedNotifications$ will reflect changes from both automatic updates (due to changes in notifications$ and notificationTypes$ and manual updates triggered by calling triggerManualUpdate(). Each update will be distinct, preventing infinite loops.