How to create an observable of the difference between two sorted observables?

432 views Asked by At

Let's say I have two observables of sorted doubles. I'd like to get the difference between them as an observable. For instance:

           1       2           4        
 left:   ──o───────o───────────o────/
           1    3  4   5
 right:  ──o────o──o───o───/
                   l2          r3   r5
 output: ──────────o───────────o────o─/

The imperative implementation for this is simple: Keep a list of the items on the side that you've still not reached and "emit" the items from the other side.

What is the canonical approach to this in the world of RFP? I'm specifically using RxScala.

2

There are 2 answers

8
Alexander Perfilyev On

This is how I would do it in rxjava implied that two observables have the same length.

    Observable<Integer> obs1 = Observable.just(1, 2, 4, 6);
    Observable<Integer> obs2 = Observable.just(1, 3, 4, 5);

    obs1.zipWith(obs2, (integer, integer2) -> {
        if (!Objects.equals(integer, integer2)) {
            return Observable.just(integer).concatWith(Observable.just(integer2));
        } else {
            return Observable.empty();
        }
    })
      .flatMap(observable -> observable)
      .sorted()
      .forEach(System.out::println);

EDIT

Another approach would be to use a collection

    Observable<Integer> obs1 = Observable.just(1, 2, 4);
    Observable<Integer> obs2 = Observable.just(1, 3, 4, 5);


    obs1.mergeWith(obs2)
            .sorted()
            .reduce(new ArrayList<Integer>(), (integers, integer) -> {
                if (integers.contains(integer)) {
                    integers.remove(integer);
                } else {
                    integers.add(integer);
                }
                return integers;
            })
            .flatMapIterable(integers -> integers)
            .forEach(System.out::println);
0
Simon Baslé On

I don't think there is much in RxJava to do that symmetric difference "in real time"... Mainly because your starting assumption (the observables are sorted) cannot be made by generic operators, so few will help you with that.

However, you could try to code a custom operator for that purpose, by taking inspiration from sequenceEqual: it internally advances step-by-step between two publishers to perform equality comparison, so that's close to what you want to do.