Create hot observable

233 views Asked by At

I am taking my first steps with RX. I have read some bits about it but I thought getting my hands dirty would be the better way to go. So I started transforming one my existing codes into a Rx type of code.

The goal: I am trying to mock a source that sends out data with a specific frequency (say 60/s, a video camera or whatever). I have footage that was recorded to simulate the source while the source is not available. And I need the source to start sending even if no one is listening because thats what the real source would do.

Before Rx, I went and made a Runnable that just iterates over the 15.000 data items, sends the item to my RabbitMQ server and sleeps for 1/60s and then sends the next one.

Now I want to turn that logic into a hot observable, just for playing around. So far I have this:

Observable.from(mDataItems)
                .takeWhile(item -> mRunning)
                .map(mGson::toJson)
                .doOnNext(json -> {
                    try {
                        mChannel.basicPublish(EXCHANGE_NAME, "", null, json.getBytes());
                    } catch (IOException e) {
                        Logger.error(e, String.format("Could not publish to %s exchange", EXCHANGE_NAME));
                    }

                    try {
                        Thread.sleep(1 / SENDING_FREQUENCY_IN_HZ);
                    } catch (InterruptedException e) {
                        Logger.error(e, String.format("Could not sleep for %d ms", (int) (1000 / SENDING_FREQUENCY_IN_HZ)));
                    }
                })
                .doOnCompleted(() -> {
                    if (mRunning)
                        Logger.info("All data sent");
                    else
                        Logger.info("Interrupted while sending");

                    disconnect();
                    mRunning = false;
                })
                .subscribeOn(Schedulers.io())
                .publish()
                .connect(); 

And even though it works so far, I dont know if this is the "good" way to create a hot Observable (or a Observable in general for that matter) that just emits items. (I also dont know if I should use a Subject instead of a Observable, but thats another question).

1

There are 1 answers

0
akarnokd On

Yes, there is an alternative:

int delay = 1000 / frequency;
Observable o = Observable.from(dataItems)
.zipWith(
    Observable.timer(delay, delay, TimeUnit.MILLISECONDS)
        .onBackpressureDrop(),
    (s, t) -> s)
.map(mGson::toJson)
// other ops as necessary
.subscribeOn(Schedulers.io())
.publish();

o.connect();

o.subscribe(...);