How to send cancel signal to ConnectableFlowable?

167 views Asked by At

I use disposable Flowable to emit and subscribe items. But when I try to use ConnectableFlowable I can not send cancel signal to emitter. How can I understand flowable is disposed inside Flowable.create method?

You can see the scenario by comment and uncomment 'publish().autoConnect()' code snipped.

Disposable disposable = Flowable.create(emitter -> {
        AtomicBoolean isRunning = new AtomicBoolean(true);
        AtomicInteger i = new AtomicInteger();
        new Thread(() -> {
            while (isRunning.get()) {
                i.getAndIncrement();
                System.out.println("Emitting:" + i.get());
                emitter.onNext(i.get());
                try {
                    Thread.sleep(1_000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        emitter.setCancellable(() -> {
            System.out.println("Cancelled");
            isRunning.set(false);
        });
    }, BackpressureStrategy.BUFFER)
            .publish() //comment here
            .autoConnect() //and here
            .subscribe(s -> {
                System.out.println("Subscribed:" + s);
            });

    Thread.sleep(10_000);
    disposable.dispose();
    Thread.sleep(100_000);
1

There are 1 answers

0
akarnokd On BEST ANSWER

There is an overload that gives you access to the Disposable to cancel the connection:

SerialDisposable sd = new SerialDisposable();

source.publish().autoConnect(1, sd::set);

// ...

sd.dispose();