I'm trying to unsubscribe after receiving the first item from an observable. And it seems to not work. What am I doing wrong?
public class ObservableAndSubscriber {
public static void main(final String[] args) {
final Observable<String> strObservable = Observable.create(s -> {
while (true) {
s.onNext("Hello World!!");
}
});
final Subscriber<String> strSubscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(final Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(final String t) {
System.out.println(t);
this.unsubscribe();
}
};
strObservable.subscribe(strSubscriber);
}
}
The result seems to print "Hello World" in an infinite loop.
I did this example while I was reading the book by Tomasz Nurkiewicz. He later had a similar example and explained what was wrong with my code.
Infinite loops are evil! Since, I had one in my observable create lambda expression, it is getting executed in the context of my main thread and subscribe blocks indefinitely. In order to get the observable create lambda execute in a different thread,which means the main thread never gets blocked and subscribe() completes. I asked the subscription to happen on a IO thread and that did the trick, which means the infinite loop is happening on a IO thread and does not block the main thread.