I've got an rxjs observer (really a Subject) that tails a file forever, just like tail -f. It's awesome for monitoring logfiles, for example.
This "forever" behavior is great for my application, but terrible for testing. Currently my application works but my tests hang forever.
I'd like to force an observer change to complete early, because my test code knows how many lines should be in the file. How do I do this?
I tried calling onCompleted on the Subject handle I returned but at that point it's basically cast as an observer and you can't force it to close, the error is:
Object # has no method 'onCompleted'
Here's the source code:
function ObserveTail(filename) {
source = new Rx.Subject();
if (fs.existsSync(filename) == false) {
console.error("file doesn't exist: " + filename);
}
var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);
tail.on("line", function(line) {
source.onNext(line);
});
tail.on('close', function(data) {
console.log("tail closed");
source.onCompleted();
});
tail.on('error', function(error) {
console.error(error);
});
this.source = source;
}
And here's the test code that can't figure out how to force forever to end (tape style test). Note the "ILLEGAL" line:
test('tailing a file works correctly', function(tid) {
var lines = 8;
var i = 0;
var filename = 'tape/tail.json';
var handle = new ObserveTail(filename);
touch(filename);
handle.source
.filter(function (x) {
try {
JSON.parse(x);
return true;
} catch (error) {
tid.pass("correctly caught illegal JSON");
return false;
}
})
.map(function(x) { return JSON.parse(x) })
.map(function(j) { return j.name })
.timeout(10000, "observer timed out")
.subscribe (
function(name) {
tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
i++;
if (i >= lines) {
handle.onCompleted(); // XXX ILLEGAL
}
},
function(err) {
console.error(err)
tid.fail("err leaked through to subscriber");
},
function() {
tid.end();
console.log("Completed");
}
);
})
It sounds like you solved your problem, but to your original question
In general the use of
Subject
s is discouraged when you have better alternatives, since they tend to be a crutch for people to use programming styles they are familiar with. Instead of trying to use aSubject
I would suggest that you think about what each event would mean in an Observable life cycles.Wrap Event Emitters
There already exists wrapper for the
EventEmitter#on/off
pattern in the form ofObservable.fromEvent
. It handles clean up and keeping the subscription alive only when there are listeners. ThusObserveTail
can be refactored intoWhich has several benefits over the vanilla use of
Subjects
, one, you will now actually see the error downstream, and two, this will handle clean up of your events when you are done with them.Avoid *Sync Methods
Then this can be rolled into your file existence checking without the use of
readSync
Next you can simplify your filter/map/map sequence down by using
flatMap
instead.Don't signal, unsubscribe
How do you stop "signal" a stop when streams only travel in one direction. We rarely actually want to have an Observer directly communicate with an Observable, so a better pattern is to not actually "signal" a stop but to simply unsubscribe from the
Observable
and leave it up to the Observable's behavior to determine what it should do from there.Essentially your
Observer
really shouldn't care about yourObservable
more than to say "I'm done here".To do that you need to declare a condition you want to reach in when stopping.
In this case since you are simply stopping after a set number in your test case you can use
take
to unsubscribe. Thus the final subscribe block would look like:Edit 1
As pointed out in the comments, In the case of this particular api there isn't a real "close" event since Tail is essentially an infinite operation. In this sense it is no different from a mouse event handler, we will stop sending events when people stop listening. So your block would probably end up looking like:
The addition of the
finally
and theshare
operators creates an object which will attach to the tail when a new subscriber arrives and will remain attached as long as there is at least one subscriber still listening. Once all the subscribers are done however we can safelyunwatch
the tail.