Testing Subject using TestScheduler in RxJs

4.9k views Asked by At

I am using RxJs to count how many packets arrive in a particular time window. My code basically looks like this:

var packetSubject = new Rx.Subject();
var packetsInWindow = [];

function startMonitoring() {
    var subscription = packetSubject
        .windowWithTime(1000)
        .select(function(window) {
            window.toArray().subscribe(function(elements) {
                packetsInWindow.push(elements.length);
            });
        })
        .subscribe();
}

function newPacket(packet) {
    packetSubject.onNext(packet);
}

How to unit test this code using Rx TestScheduler? I could not find any suitable example for testing Subjects.

2

There are 2 answers

0
Emmanuel Kong On BEST ANSWER

Have a look this example :

   var x = 0,
       scheduler = new Rx.TestScheduler();

   var subject = new Rx.Subject();
   subject.throttle(100, scheduler).subscribe(function (value) {
       x = value;
   });

   scheduler.scheduleWithAbsolute(0, function () {
       subject.onNext(1);//trigger first event with value 1
   });
   scheduler.scheduleWithAbsolute(50, function () {
       expect(x).toEqual(0);//value hasn't been updated
   });
   scheduler.scheduleWithAbsolute(200, function () {
       expect(x).toEqual(1);//value update after throttle's windowDuration 
   });

   scheduler.start();

https://emmkong.wordpress.com/2015/03/18/how-to-unit-test-rxjs-throttle-with-rx-testscheduler/

0
Alex Eyhorn On

With RxJs 4.x you can call advanceBy on the TestScheduler. TestScheduler can be injected during setup. I've written a blog post about it.

Here is the example.

    var throttleWindowDuration = 2 * 1000; /* 2 seconds */

    function throttleTest() {
      var unthrottledStream = new Rx.Subject();
      var source = unthrottledStream.throttle(throttleWindowDuration);
      var result = {
        emitCounter: 0,
        unthrottledStream
      };

      var subscription = source.subscribe(
        function() {
          result.emitCounter++;
        });

      return result;
    }

    describe('run with test scheduler', function() {
      var testScheduler;
      var throttleSpy;

      beforeAll(function() {
        testScheduler = new Rx.TestScheduler();
        var originalThrottle = Rx.Observable.prototype.throttle;
        throttleSpy = spyOn(Rx.Observable.prototype, 'throttle')
          .and.callFake(function(dueTime) {
            return originalThrottle.call(this, dueTime, testScheduler);
          });
      });

      afterAll(function() {
        throttleSpy.and.callThrough();
      });

      it('advancing testScheduler allows to test throttling synchronously', function() {
        var throttleTestResult = throttleTest();

        //throttled stream will not emit if scheduler clock is at zero
        testScheduler.advanceBy(1);
        throttleTestResult.unthrottledStream.onNext();
        throttleTestResult.unthrottledStream.onNext();
        throttleTestResult.unthrottledStream.onNext();

        testScheduler.advanceBy(throttleWindowDuration);
        throttleTestResult.unthrottledStream.onNext();
        throttleTestResult.unthrottledStream.onNext();

        testScheduler.advanceBy(throttleWindowDuration);
        throttleTestResult.unthrottledStream.onNext();

        testScheduler.advanceBy(throttleWindowDuration);
        throttleTestResult.unthrottledStream.onNext();

        expect(throttleTestResult.emitCounter).toBe(4);
      });
    });

    describe('run without test scheduler', function() {
      it('without test scheduler the emit counter will stay at 1 ' 
        + 'as throttle duration is not elapsed', function() {
        var throttleTestResult = throttleTest();

        throttleTestResult.unthrottledStream.onNext();
        throttleTestResult.unthrottledStream.onNext();
        throttleTestResult.unthrottledStream.onNext();
        throttleTestResult.unthrottledStream.onNext();
        expect(throttleTestResult.emitCounter).toBe(1);
      });
    });

With RxJs 5 everything is a lot simpler. You can simply use jasmine.clock like you would for testing other JavaScript time-based operations. I've demonstrated the solution in my blog post