I am trying to create an RxJava BlockingObservable
that will emit the value of a variable every X milliseconds until (condition == true) or a timeout occurs.
The code below seems close to what I want, but it always emits ONCE and then exits. What's odd is that I have a condition in takeUntil()
which will NEVER be true -- I'd expect this observable to emit continuously and eventually time out, but it doesn't.
What am I missing/doing wrong here?
Observable.fromCallable(() -> getSendWindow())
.sample(10, TimeUnit.MILLISECONDS)
.timeout(30, TimeUnit.SECONDS)
.takeUntil(sendWindow -> 1==2)
.doOnError(throwable -> log.warn("Timed out waiting for send window to clear. Giving up."))
.doOnCompleted(() -> {
log.info("Send window cleared");
})
.toBlocking().forEach(sendWindow -> log.info("sendWindow={}, getSendWindow());
.sample does not do what you think it does. Sample rate limits the above Observable to (at most) once every 10 seconds.
Observable.fromCallable()
only emits an event once, then completes..sample()
waits 10 seconds and emits the last event (if there is one), every 10 seconds. Therefore it only emits one event, when you attach it to an Observable that only has one event. Then it completes.What you probably actually want (I'm a .net programmer, so excuse my casing etc) is this.
Edit: Thanks for @akanokd for telling me that java uses interval for repeated events.
Feel free to edit this answer with the API calls to the JAVA specific version...