RxJava Observable continue processing stream after error

164 views Asked by At

Consider this example:

while(currentObjectNumber < maximumObjectNumber){

        maximumObjectNumber = getMaximumObjectNumber();

        CountDownLatch countDownLatch = new CountDownLatch(maximumObjectNumber - currentObjectNumber);

        Observable<MyObject> observable = catchUpToMaximumObject(currentObjectNumber);

        Subscription subscription = observable.subscribe(myObject ->{
            //do some work with myObject
            currentObjectNumber += 1;
            countDownLatch.countDown();
        }, throwable -> {
            System.out.println("ERROR CAUGHT");

            int x = maximumObjectNumber - currentObjectNumber;
            while(x != 0){
                countDownLatch.countDown();
                x -= 1;
            }

        });

        countDownLatch.await();
        subscription.unsubscribe();
}

Essentially I am getting an error when generating myObject from a HTTP response, however, I want to ignore this error and try again from the failed object. This error happens randomly and is not because of my code.

Currently, my code achieves this, however, it is quite inefficient as maximumObjectNumber is very large and every time there is an error the countDownLatch needs to go to 0. Also, maximumObjectNumber increases indefinitely.

I have been looking at the retry() method but it replays already processed myObjects. I am pretty sure this is the wrong way to go about it anyway but nothing on the web I have found helps improve my understanding in this situation.

The only other idea I have is to implement an abort() method for my countDownLatch so I don't have to countdown to 0 on every error.

0

There are 0 answers