RxJava2: Unable to handle exception for asynchronous callback using retryWhen

1.5k views Asked by At

I'm trying to connect to an MQTT broker. I want to retry in case I fail to connect. I get a callback on success or failure of connection.

After reading multiple examples of retryWhen and handling asynchronous callbacks, I put together this code. It works fine if I succeed to connect. Also, it retries 3 times if I call e.onError(throwable) synchronously from the Flowable. But it crashes my android app if I call e.onError(throwable) from the callback's onFailure() method.

Here's the code:

RxJava chain

createConnectionFlowable(client, options)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .retryWhen(createRetryFunction())
    .subscribe(createConsumer());

create a Flowable

private Flowable<String> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) {
    return Flowable.create(new FlowableOnSubscribe<String>() {

        public void subscribe(final FlowableEmitter<String> e) throws Exception {
                client.connect(options).setActionCallback(new IMqttActionListener() {
                    public void onSuccess(IMqttToken iMqttToken) { e.onComplete(); }
                    public void onFailure(IMqttToken iMqttToken, Throwable throwable) { e.onError(throwable); }
                });
        }
    }, BackpressureStrategy.BUFFER);
}

Create a retry function

private Function<Flowable<Throwable>, Publisher<?>> createRetryFunction() {
    return new Function<Flowable<Throwable>, Publisher<?>>() {

        public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
            return throwableFlowable.zipWith(
                    Flowable.range(1, 3),
                    new BiFunction<Throwable, Integer, Integer>() {
                        public Integer apply(Throwable throwable, Integer integer) throws Exception { return integer; }
                    }
            )
            .flatMap(new Function<Integer, Publisher<?>>() {
                public Publisher<?> apply(Integer integer) throws Exception {
                    return Flowable.timer(integer, TimeUnit.SECONDS);
                }
            });
        }
    };
}

The Consumer: do all the good stuff here

private Consumer<String> createConsumer() {
    return new Consumer<String>() {
        public void accept(String s) throws Exception {
            Log.d(TAG, "accept: do important stuff here" + s);
        }
    };
}

Error logs

12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply() called with: throwable = [Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)], integer = [1]
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply: delay retry by seconds:1
12-20 11:51:09.589 16769-16830/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.600 16769-16831/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.lang.Thread.run(Thread.java:818)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.isConnected(IoBridge.java:234)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.connectErrno(IoBridge.java:171)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.connect(IoBridge.java:122)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:183)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:452)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.net.Socket.connect(Socket.java:884)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:   ... 2 more
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: android.system.ErrnoException: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.isConnected(IoBridge.java:223)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:   ... 8 more
12-20 11:51:09.606 16769-16769/com.work.app E/AndroidRuntime: FATAL EXCEPTION: main
                                                                     Process: com.work.app, PID: 16769
                                                                     Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: 

Questions

  1. Why does this code throw an Exception that crashes the app? Ideally, it should handle the exception? What am I missing here?
  2. Why does it not retry 3 times?
  3. Why does the same code retry correctly if I call e.onError(throwable) synchronously from the Flowable.subscribe() method?

References

  1. RxJava 1.x retryWhen doc
  2. This blog
2

There are 2 answers

0
Pravin Sonawane On BEST ANSWER

I finally got this working!

It turns out that this is not a problem with RxJava2 but the way Mqtt (Eclipse Paho) runs the callback IMqttActionListener on the main thread even though the client was created on a different thread!!!.

The simple solution to this is to wait for the client to connect on the thread it was created on. The code shared in the question is correct except for this method

@NonNull
public Flowable<Boolean> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) {
    return Flowable.create(new FlowableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(final FlowableEmitter<Boolean> e) throws Exception {
            IMqttToken connect = client.connect(options);
            connect.waitForCompletion(); //this is blocking and is what was required!!
            if (client.isConnected()) {
                e.onNext(true);
                e.onComplete();
            } else {
                e.onError(connect.getException());
            }

        }
    }, BackpressureStrategy.BUFFER);
}

Hope this helps someone working with these libraries :)

4
Kiskae On
  1. Since you subscribe using Consumer<String> you do not define an error handler for the stream. This means the error will get passed to the default error handler through RxJavaPlugins.getErrorHandler().handleError(...). On android this handler seems to cause a fatal error. To fix this use a Observer<String> instead of Consumer<String>
  2. The log seems to suggest the client failed 3 times ("onFailure" is mentioned three times) outside of Rx doing anything. If I had to guess the client might be stateful, which means that after the initial connect followup calls to client.connect(...) exhibit some form of weird behavior causing the problem. Since the log shows error - 1 sec wait - error, error I guess the callbacks remain active so the second failure gets sent to RxJava twice.
  3. Assuming you are talking about the waitForCompletion() method when you are talking about synchronous it would support my assumptions in 2. Since no callbacks are registered each throwable would only get reported once, fixing the behavior.

I'm not sure why the emitter would remain functional after it terminates (onError/onComplete) but since the spec mandates that those methods are only called once it might be unspecified behavior causing this problem.