Rx Observable of state, when state transition can fail

75 views Asked by At

Given computation which infinitely generates new value based on previous one and incoming event, that may also fail, which is the best way to obtain two Observables:

  • potentially infinite, for successful transformations,
  • for error reporting?

Details

I'm trying to model a state that is changed by events from ScalaFX GUI and also trying RXScala for the first time. Simplifying things:

trait State {
  def computeNext(basedOn: Any): State
}

sealed trait UserEvent // Has some case classes

class StateProcessor(initialState: State, events: Observable[UserEvent]) {
  ...
}

The idea is to create an Observable[State], given initial state and changing to next based on events. Simple enough:

val observableState = events.scan(initialState){(state, ev) =>
  // compute and return next state
  state computeNext ???
}

Now the problem is actually that computing next state involves I/O, and may fail, so computeNext has to return Try[State]

trait State {
  def computeNext(basedOn: Any): Try[State]
}

Now obviously, I cannot use scan anymore. Generally I want to have two Observables:

class StateProcessor(initialState: State, events: Observable[UserEvent]) {
  val observableState: Observable[State] = ???
  val observableExc: Observable[Exception] = ???
}

with a rule, that if Try[State] is a failure, observableState simply doesn't emit anything, nor does it reports an error. The next state is based on the last successful computation.

Which is the most elegant way to solve this situation?

0

There are 0 answers