Sometimes the business logic seems to be able to naturally modeled by some recursive defined observables. Here is one example:
interface Demo {
IObservable<CommandId> userCommands;
IObservable<IObservable<IProcessingState>> processes;
IObservable<CommandId> skippedCommands;
IObservable<(CommandId, CommandResult)> RunCommand(CommandId id);
}
interface IProcessingState {
bool IsProcessing {get;}
CommandId? ProcessingId {get;}
}
For each command user inputs, it should either trigger a running process in prcocess, or emit one value in skippedCommands. Some direct translate of this logic maybe
var validCommands = userCommands.WithLatestFrom(processes).Where(x => !x.Item2.IsProcessing)
var skippedCommands = userCommands.WithLatestFrom(processes).Where(x => x.Item2.IsProcessing)
var processes = validCommands.Select(c => RunCommand(c))
As code above indicated, assign of validCommands
and processes
are mutual recursive, we can equivalently define processes
directly using itself recursively by
var processes = userCommands.WithLatestFrom(processes)
.Where(x => !x.Item2.IsProcessing)
.Select(c => RunCommand(c))
However we can not define prcesses
Observable in C# like this.
I've found several possible related things:
Observable.Generate
constructor. However it seems that it is folding on its own state in a synchronize way, I don't know how to useuserCommands
observable andRunCommand
inObservable.Generate
;Some operator like
exhaust
orexhaustMap
in RxJS, while Rx.Net didn't provide this operator, there are serval 3rd-party libraries provided these operators, like FSharp.Control.Reactive. The implementation is something like
let exhaustMap f source =
Observable.Create (fun (o : IObserver<_>) ->
let mutable hasSubscription = false
let mutable innerSub = None
let onInnerCompleted () =
hasSubscription <- false
innerSub |> Option.iter Disposable.dispose
let onOuterNext x =
if not hasSubscription then
hasSubscription <- true
f x |> subscribeSafeWithCallbacks
o.OnNext o.OnError onInnerCompleted
|> fun y -> innerSub <- Some y
source
|> subscribeSafeWithCallbacks
onOuterNext o.OnError o.OnCompleted)
However, there are two problems. a. directly use this operator does not fit requirements above, skipped commands will be silently ignored. We can modify source code a litter to fit the requirements, but there is still another problem b. the implementation introduced two local mutable variables, and two nested subscription. I don't know if this is ok in all cases (will there be risk of data racing?), and prefer solutions based on composition of operators other than mutable references
SodiumFRP provided forward references types
StreamLoop
andCellLoop
. And by the Functional Reactive Programming book, Rx alternative for these forward references types would beSubject
, by usingSubject
the recursive construct above is separated into two phases. The problem is by Intro to Rx indicated, usingSubject
requires manually manage more state, at least dispose of the subject is required, and maybe forced to hot observables. I'm wondering if there exists solutions without usingSubject
Using
window
operator with boundary on last value (just before finish) onRunCommand
results, theprocesses
above can some how be constructed, but this solution need to use ending signal twice, which requires careful treatment (quiet a while on trying and tuningTake(1)
,zip
,withLatestFrom
,combineLatest
, overloads ofWindow
operators to get desired result) on simultaneous events.
Are there better solutions or modification to solutions above to this problem, especially only use operators?
Your types are all weird and hard to work with. Your question is fundamentally a simple state machine with two triggers: 1) New command arrives, 2) Previous command finishes executing.
This should get you started:
source
here doesn't have to be a subject, and probably shouldn't be: It can be anyIObservable<ICommand>
. It's just easier to mock up in an answer. TheexeuctionTerminatedLoopback
though does have to be, to (as you put it) break up the recursion into two parts. Because it's a subject, it should be kept private and not leaked out.I don't think there are any answers in C# without using Subject.