How can I handle a JSON streaming GET request using RxJs

789 views Asked by At

I have an API endpoint that streams a JSON response. Now I want to use RxJs observables to stream the data chunks like this:

enter image description here

And suscribe to the observable in other classes. How can i do it? thanks!

I consume it in this way:

http.get({
        hostname: 
        port: 80,
        path: '/api/' + "v4" + '/contests/' + "contest" + '/event-feed',
        method: 'GET',
        headers: {
            'Content-Type': 'application/json'
        },
        qs: {
            strict: false,
            stream: true
        }
    }, (res) => {
        res.on('data', (chunk) => {
            rawData += chunk;
            let obj;
            try {
                obj = JSON.parse(chunk);
                console.log(obj)
            } catch (e) {
                if (e.constructor.name != "SyntaxError") console.log("[ERROR]: " + e);
                else console.log("...");
            }

        });
        res.on('end', () => {
            try {
                console.log("---FIN---\n");
                const parsedData = JSON.parse(rawData);
                console.log(parsedData);
            } catch (e) {
                console.error(e.message);
            }
        });

And it works.

1

There are 1 answers

0
dezfowler On

There's actually quite a lot of subtlety in what you're asking here and how it interacts with Observables which I'll try to delve into a bit...

One way of doing this is to create a Subject before you trigger the http.get which you then call the .next() method on when you receive data. You return the Subject and then subscribe to that. The challenge here is that the http operation may have already completed by the time you subscribe and you miss the data chunks.

A way to avoid missing stuff is to use ReplaySubject instead which will buffer and replay some values to new subscribers.

The more usual way to do it is to not actually trigger the http call until someone subscribes which you can do like this...

var http = require('https');
var { Observable } = require('rxjs');

function prepareConnection() {
    return new Observable(subscriber => {
        http.get({
            hostname: 
            port: 80,
            path: '/api/' + "v4" + '/contests/' + "contest" + '/event-feed',
            method: 'GET',
            headers: {
                'Content-Type': 'application/json'
            },
            qs: {
                strict: false,
                stream: true
           }
        }, (res) => {
            res.on('data', (chunk) => {
                let obj;
                try {
                    obj = JSON.parse(chunk);
                    subscriber.next(obj);
                } catch (e) {
                    subscriber.error(e);
                }        
            });
            res.on('end', () => {
                subscriber.complete();
            });
        });
    });
}

prepareConnection().subscribe({
    next(x) { console.log(x); },
    error(err) { console.error('something wrong occurred: ' + err); },
    complete() { console.log('done'); }
});

The function passed to the constructor executes when someone subscribes to the Observable with the subscriber being the first argument of that function that you can call next() etc on.

The caveat here being that each new subscriber will trigger an new http call. That may or may not be the behaviour you want - If it's not what you want you can use share to share one subscription e.g.

let connection = prepareConnection().pipe(share());

connection.subscribe({
   ...
});

connection.subscribe({
   ...
});