Chain Observables

309 views Asked by At

I have a collection of objects, call them obj. They have an act() method. The act() method will eventually cause the event() observable on o to call onComplete.

What is a good way to chain these?

That is, call o.act(), wait for o.event().onComplete and then call the next o2.act(), and so on for indefinite number of o in collection.

So the signature is like so:

public class Item {
    final protected PublishSubject<Object> event = PublishSubject.create();

    public Observable<ReturnType> event() {
        return event;
    }

    public void act() {
        // do a bunch of stuff
        event.onComplete();
    }
}

And then in the consuming code:

Collection<Item> items...
foreach item in items
  item.act -> await item.event().onComplete() -> call next item.act() -> so on
1

There are 1 answers

8
weston On BEST ANSWER

If I understand correctly, your objects have this kind of signature:

public class Item {
    public Observable<ReturnType> event()...
    public ReturnType act()...
}

So if they were filled out like so:

public class Item {

    private final String data;
    private final Observable<ReturnType> event;

    public Item(String data) {
        this.data = data;

        event = Observable
                .fromCallable(this::act);
    }

    public Observable<ReturnType> event() {
        return event;
    }

    public ReturnType act() {
        System.out.println("Item.act: " + data);
        return new ReturnType();
    }
}

They could then be chained like so:

Item item1 = new Item("a");
Item item2 = new Item("b");
Item item3 = new Item("c");

item1.event()
        .concatWith(item2.event())
        .concatWith(item3.event())
        .subscribe();

Result:

Item.act: a
Item.act: b
Item.act: c

Then if you have an Iterable collection, you could use flatMap:

Iterable<Item> items = Arrays.asList(item1, item2, item3);

Observable.from(items)
        .flatMap(Item::event)
        .subscribe();

Alternative

An alternative that's more like your case might be:

public class Item {
    private final PublishSubject<Void> event = PublishSubject.create();

    private final String data;

    public Item(String data) {
        this.data = data;
    }

    public Observable<Void> event() {
        return event;
    }

    public Void act() {
        System.out.println("Item.act: " + data);
        // do a bunch of stuff
        event.onCompleted();
        return null;
    }
}

usage:

Iterable<Item> iterable = Arrays.asList(item2, item3);

item1.event().
        concatWith(Observable.from(iterable)
                .map(Item::act))
        .subscribe();

item1.act();

But it does not use the event() on items 2 onwards.