RxJava: OnNext Unsubscribe is not working

744 views Asked by At

I'm trying to unsubscribe after receiving the first item from an observable. And it seems to not work. What am I doing wrong?

    public class ObservableAndSubscriber {

    public static void main(final String[] args) {
        final Observable<String> strObservable = Observable.create(s -> {
            while (true) {
                s.onNext("Hello World!!");
            }
        });

        final Subscriber<String> strSubscriber = new Subscriber<String>() {

            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(final Throwable e) {
                e.printStackTrace();

            }

            @Override
            public void onNext(final String t) {
                System.out.println(t);
                this.unsubscribe();

            }
        };
        strObservable.subscribe(strSubscriber);
    }
}

The result seems to print "Hello World" in an infinite loop.

2

There are 2 answers

1
Vanchinathan Chandrasekaran On BEST ANSWER

I did this example while I was reading the book by Tomasz Nurkiewicz. He later had a similar example and explained what was wrong with my code.

Infinite loops are evil! Since, I had one in my observable create lambda expression, it is getting executed in the context of my main thread and subscribe blocks indefinitely. In order to get the observable create lambda execute in a different thread,which means the main thread never gets blocked and subscribe() completes. I asked the subscription to happen on a IO thread and that did the trick, which means the infinite loop is happening on a IO thread and does not block the main thread.

strObservable.subscribeOn(Schedulers.io()).subscribe(strSubscriber);
0
Yaroslav Stavnichiy On

For unsubscribe to work you need to check subscription status in your loop. Otherwise it will run infinitely draining your CPU.

The easiest way to deal with infinite sequencies is to utilize library provided methods Observable.generate() or Observable.fromIterable(). They do proper checking for you.

Also make sure you subscribe and observe on different threads. Otherwise you will only serve single subscriber.

Your example:

strObservable = Observable.generate(g -> g.onNext("Hello!"));
strObservable.subscribeOn(Schedulers.newThread()).subscribe(strSubscriber);