Configure an Observable to return all previous values as an array when a new value is pushed?

1.5k views Asked by At

Having a hard time with this. I'm new to working with RxJs Observables so I need some guidance.

I'm trying to build an Observable logging stream that does two things.

  1. Whenever a new line/value is written to the log file, push that new value to the stream.
  2. Begin pre-populated with values from the log file.

I've accomplished both of the above criteria. The challenge is now using it with an *ngFor.

*ngFor requires an array from an Observable so it can do comparison to add/remove (my best guess). But my observable returns only an array of the last item pushed through.

//logviewer.page.ts constructor()
this.logs = Subject.create();
this.logs$ = this.logs.asObservable()
            .startWith("logs\\error.log")
            .flatMap((fileName: string) => {
                //start by reading the existing log files as a string
                return this.$localStorageService.readAsStringAsync(fileName);
            })
            .map((contents: string) => {
                //this part splits up the log file line-by-line into an log entry
                let logs = contents.split(/\r\n|\r|\n/).filter(n => n.length > 0);
                logs.forEach((s, ix, parent) => {
                    let x = JSON.parse(s);
                    parent[ix] = { timestamp: new Date(parseFloat(x[0])), message: x[1] };
                })
                return logs; //an array of objects { timestamp, message }
            })
            //merge the existing application log stream
            //throughout the application we log errors, info, etc
            //if a new entry is made it will appear here
            .merge(this.$loggerService.applicationLog$.map((x) => {                    
                //return an array with one object { timestamp, message }
                return [{ timestamp: new Date(parseFloat(x[0])), message: x[1] }];
            }))

Now my template is very simple, for now.

//logviewer.template.ts
<div *ngFor="let entry of logs$ | async">
    {{entry|json}}
</div>

Now to test it, I have a button to add an entry

//logviewer.page.ts
addEntry() {
    this.$loggerService.error("this is a test");
}

//LoggerService.service.ts
private applicationLog: ReplaySubject<any[]>;
get applicationLog$(): Observable<any[]> {
    return this.applicationLog.asObservable();
}

error(...args) {
    let data = [Date.now().toString()].concat(args.map<string>((n, ix) => { return toString(n); }));

    // ... write to file

    // fire the subject
    this.applicationLog.next(data);
}

Now when I click addEntry, the plumbing all works, and the value is fired through the observable sequence correctly. But my *ngFor only updates with a single value. It doesn't keep the history of all the previous log entries. Just the last array returned, which makes sense.

How do I make my observable sequence always return an array of all values. I can make it return one entry at a time, but I need to full history to satisfy *ngFor

I had a lack of understanding of *ngFor and the async pipe. I thought that it subscribe to the observable, and automatically add any new entries to the ngFor, but this isn't the case.

1

There are 1 answers

0
Sergey Sokolov On BEST ANSWER

Try to use scan operator:

this.logs$ = this.logs.asObservable()
        ...
        .merge(this.$loggerService.applicationLog$.map((x) => {                    
            //return an array with one object { timestamp, message }
            return [{ timestamp: new Date(parseFloat(x[0])), message: x[1] }];
        }))
        .scan((acc, x) => {
            acc.push(...x);
            return acc;
        }, []);