I called the Flowable.create to implement that loop read data from bluetooth sockete InputStream, I try the below pseudocode to implement this.
// connectBluetoothFlowable is a Flowable<Boolean>
connectBluetoothFlowable.flatMap(bluetoothConnected -> {
if (!bluetoothConnected) {
return Flowable.error(new Exception("ERROR"));
}
return Flowable.create(emitter -> {
while(true) {
// read data from bluetooth sockete InputStream
// String data = ...;
// emit the data
Log.e("emitter", data);
emitter.onNext(data);
// if(...) {
// break;
// }
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
}).subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
// onNext callback
Log.e("result", "result: " + data);
}, throwable -> {
// onError
}, () -> {
// onComplete
}, subscription -> {
// onSubscribe
});
But after Log.e("emitter", "emitter data"); log many times, the Log.e("result", "result: " + data); can't log anymore at onNext callback. I'm so confused why. Could anyone help me?
The RxJava2 version is 2.2.19, the RxAndroid version is 2.1.1
I try the below code, but after Log.e("emitter", "emitter data"); log many times, the Log.e("result", "result: " + data); can't log anymore at onNext callback.
// connectBluetoothFlowable is a Flowable<Boolean>
connectBluetoothFlowable.flatMap(bluetoothConnected -> {
if (!bluetoothConnected) {
return Flowable.error(new Exception("ERROR"));
}
return Flowable.create(emitter -> {
while(true) {
// read data from bluetooth sockete InputStream
// String data = ...;
// emit the data
Log.e("emitter", data);
emitter.onNext(data);
// if(...) {
// break;
// }
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER).onBackpressureBuffer();
}).subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
// onNext callback
Log.e("result", "result: " + data);
}, throwable -> {
// onError
}, () -> {
// onComplete
}, subscription -> {
// onSubscribe
});