Combining multiple change streams in ReactFX

252 views Asked by At

Question

How do you correctly combine multiple property change streams in ReactFX for use in UndoFX (or any use case)?

Details

Here's a short explanation of what I'm trying to accomplish (full example code is posted at GitHub):

There is an example model that has two properties. For simplicity's sake, they are double properties

public class DataModel {
    private DoubleProperty a, b;
    //...
    //with appropriate getters, setters, equals, hashcode
    //...
}

In the example code, there are buttons to change either one or both of the properties. I would like to undo changes to both if that's what the change is.

Per the UndoFX example, there are also change classes for each (also abbreviated here) that inherit from a base class:

public abstract class ChangeBase<T> implements UndoChange {
   protected final T oldValue, newValue;
   protected final DataModel model;

   protected ChangeBase(DataModel model, T oldValue, T newValue) {
      this.model = model;
      this.oldValue = oldValue;
      this.newValue = newValue;
   }

   public abstract ChangeBase<T> invert();
   public abstract void redo();

   public Optional<ChangeBase<?>> mergeWith(ChangeBase<?> other) {
      return Optional.empty();
   }

   @Override
   public int hashCode() {
      return Objects.hash(this.oldValue, this.newValue);
   }      

   @Override
   public boolean equals(Object obj) {
      if (this == obj) {
         return true;
      }
      if (obj == null) {
         return false;
      }
      if (getClass() != obj.getClass()) {
         return false;
      }
      final ChangeBase<?> other = (ChangeBase<?>) obj;
      if (!Objects.equals(this.oldValue, other.oldValue)) {
         return false;
      }
      if (!Objects.equals(this.newValue, other.newValue)) {
         return false;
      }
      if (!Objects.equals(this.model, other.model)) {
         return false;
      }
      return true;
   }
}

public class ChangeA extends ChangeBase<Double> {
   //...
   //constructors and other method implementations
   //..
   @Override
   public void redo() {
      System.out.println("ChangeA redo "+this);
      this.model.setA(this.newValue);
   }   
}

public class ChangeB extends ChangeBase<Double> {
   //...
   //constructors and other method implementations
   //...
   @Override
   public void redo() {
      System.out.println("ChangeA redo "+this);
      this.model.setB(this.newValue);
   }   
}

All changes implement an interface

public interface UndoChange {
   public void redo();
   public UndoChange invert();
   public Optional<UndoChange> mergeWith(UndoChange other);

}

After reading up on documentation, I started by creating an event stream for to capture changes to each property:

EventStream<UndoChange> changeAStream = 
    EventStreams.changesOf(model.aProperty())
              .map(c -> new ChangeA(model, (Change<Number>)c));
EventStream<UndoChange> changeBStream = 
    EventStreams.changesOf(model.bProperty())
              .map(c -> new ChangeB(model, (Change<Number>)c));

My first attempt was to merge the streams like so

EventStream<UndoChange> bothStream = EventStreams.merge(changeAStream, changeBStream); 

What happens in this case is that if A and B properties are changed at the same time, the stream will have two changes in it, and each will undo separately instead of together. Each call to a setter puts a change in the appropriate stream, which are then emitted to bothStream, which then contains two separate events instead of one.

After some more reading I then tried to combine the streams and map into a separate change object:

 EventStream<UndoChange> bothStream = EventStreams.combine(changeAStream, changeBStream).map(ChangeBoth::new); 

where ChangeBoth is defined as:

public class ChangeBoth implements UndoChange {

   private final ChangeA aChange;
   private final ChangeB bChange;

   public ChangeBoth(ChangeA ac, ChangeB bc) {
      this.aChange = ac;
      this.bChange = bc;
   }

   public ChangeBoth(Tuple2<UndoChange, UndoChange> tuple) {
      this.aChange = ((ChangeBoth)tuple.get1()).aChange;
      this.bChange = ((ChangeBoth)tuple.get2()).bChange;
   }

   @Override
   public UndoChange invert() {
      System.out.println("ChangeBoth invert "+this);
      return new ChangeBoth(new ChangeA(this.aChange.model, this.aChange.newValue, this.aChange.oldValue), 
                            new ChangeB(this.bChange.model, this.bChange.newValue, this.bChange.oldValue));
   }

   @Override
   public void redo() {
      System.out.println("ChangeBoth redo "+this);
      DataModel model = this.aChange.model;
      model.setA(this.aChange.newValue);
      model.setB(this.bChange.newValue);
   }

   //...
   // plus appropriate mergeWith, hashcode, equals
   //...
}

This results in a IllegalStateException: Unexpected change received being thrown. After some digging, I determined why this is happening: when the ChangeBoth is being undone (via invert() and redo() calls), it sets each property back to the old value. However, when it sets each property, the change is emitted back through the streams resulting in a new ChangeBoth being put in the stream in between setting the two properties back to old values.

Summary

So to get back to my question: what's the correct way to do this? Is there a way to combine to change streams for two properties into one change object that doesn't cause this problem?

Edit - Attempt 1

Per Tomas's answer, I added/changed the following code (NOTE: Code in repo has been updated):

changeAStream and changeBstream remain the same.

Instead of combining the stream, I did as Tomas suggested and created a binary operator to reduce two changes to one:

BinaryOperator<UndoChange> abOp = (c1, c2) -> {
   ChangeA ca = null;
   if(c1 instanceof ChangeA) {
      ca = (ChangeA)c1;
   }
   ChangeB cb = null;
   if(c2 instanceof ChangeB) {
      cb = (ChangeB)c2;
   }
   return new ChangeBoth(ca, cb);
};

and changed the event stream to

SuspendableEventStream<UndoChange> bothStream = EventStreams.merge(changeAStream, changeBStream).reducible(abOp);

Now the button action isn't implemented in setonAction but is instead handled with an event stream

  EventStreams.eventsOf(bothButton, ActionEvent.ACTION)
          .suspenderOf(bothStream)
          .subscribe((ActionEvent event) ->{
             System.out.println("stream action");
              model.setA(Math.random()*10.0);
              model.setB(Math.random()*10.0);                 
          });

This works great for combining events appropriately, but undoing still breaks for A+B changes. It works for separate A and B changes. Here's an example of two A+B changes and then an undo

A+B Button Action in event stream
Change in A stream
Change in B stream
A+B Button Action in event stream
Change in A stream
Change in B stream
ChangeBoth attempting merge with combinedeventstreamtest.ChangeBoth@775ec8e8... merged
undo 6.897901340713284  2.853416510829745
ChangeBoth invert combinedeventstreamtest.ChangeBoth@aae83334
ChangeA invert combinedeventstreamtest.ChangeA@32ee049a
ChangeB invert combinedeventstreamtest.ChangeB@4919dd13
ChangeBoth redo combinedeventstreamtest.ChangeBoth@b2155b1e
Change in A stream
Exception in thread "JavaFX Application Thread" java.lang.IllegalArgumentException: Unexpected change received.
Expected:
combinedeventstreamtest.ChangeBoth@b2155b1e
Received:
combinedeventstreamtest.ChangeA@2ad21e08
Change in B stream

Edit - Attempt 2 - Success!!

Tomas was nice enough to point out the solution (which I should have realized). Just suspend while doing redo():

  UndoManager<UndoChange> um = UndoManagerFactory.unlimitedHistoryUndoManager(
          bothStream, 
          c -> c.invert(), 
          c -> bothStream.suspendWhile(c::redo),
          (c1, c2) -> c1.mergeWith(c2)
  );
1

There are 1 answers

3
Tomas Mikula On BEST ANSWER

So the task is to combine into one the changes that have been emitted from bothStream during handling a button click.

You will need a function for reducing two UndoChanges into one:

BinaryOperator<UndoChange> reduction = ???; // implement as appropriate

Make bothStream reduce changes into one while "suspended":

SuspendableEventStream<UndoChange> bothStream =
        EventStreams.merge(changeAStream, changeBStream).reducible(reduction);

Now you just need to suspend bothStream while processing the button click. This can be done like this:

EventStreams.eventsOf(bothButton, ActionEvent.ACTION) // Observe actions of bothButton ...
        .suspenderOf(bothStream)                      // but suspend bothStream ...
        .subscribe((ActionEvent event) -> {           // before handling the action.
            model.setA(Math.random()*10.0);
            model.setB(Math.random()*10.0);
        })

Also suspend bothStream when undoing/redoing the change from within the undo manager, so that exact inverse of the (combined) change is emitted from bothStream when undoing a combined change (which is necessary to make UndoManager happy). This can be done by wrapping the apply argument to UndoManager constructor in bothStream.suspendWhile(), e.g.:

UndoManagerFactory.unlimitedHistoryUndoManager(
        bothStream, 
        c -> c.invert(), 
        c -> bothStream.suspendWhile(c::redo)  // suspend while applying the change
)