How to perform Tying the Knot/define observable recursively using iteself in Rx.Net?

95 views Asked by At

Sometimes the business logic seems to be able to naturally modeled by some recursive defined observables. Here is one example:

interface Demo {
    IObservable<CommandId> userCommands;
    IObservable<IObservable<IProcessingState>> processes;
    IObservable<CommandId> skippedCommands;
    IObservable<(CommandId, CommandResult)> RunCommand(CommandId id);
}

interface IProcessingState {
    bool IsProcessing {get;}
    CommandId? ProcessingId {get;}
}

For each command user inputs, it should either trigger a running process in prcocess, or emit one value in skippedCommands. Some direct translate of this logic maybe

var validCommands = userCommands.WithLatestFrom(processes).Where(x => !x.Item2.IsProcessing)
var skippedCommands = userCommands.WithLatestFrom(processes).Where(x => x.Item2.IsProcessing)
var processes = validCommands.Select(c => RunCommand(c))

As code above indicated, assign of validCommands and processes are mutual recursive, we can equivalently define processes directly using itself recursively by

var processes = userCommands.WithLatestFrom(processes)
                            .Where(x => !x.Item2.IsProcessing)
                            .Select(c => RunCommand(c))

However we can not define prcesses Observable in C# like this.

I've found several possible related things:

  1. Observable.Generate constructor. However it seems that it is folding on its own state in a synchronize way, I don't know how to use userCommands observable and RunCommand in Observable.Generate;

  2. Some operator like exhaust or exhaustMap in RxJS, while Rx.Net didn't provide this operator, there are serval 3rd-party libraries provided these operators, like FSharp.Control.Reactive. The implementation is something like

let exhaustMap f source =
        Observable.Create (fun (o : IObserver<_>) -> 
            let mutable hasSubscription = false
            let mutable innerSub = None
            let onInnerCompleted () =
                hasSubscription <- false
                innerSub |> Option.iter Disposable.dispose
            let onOuterNext x =
                if not hasSubscription then
                    hasSubscription <- true 
                    f x |> subscribeSafeWithCallbacks 
                            o.OnNext o.OnError onInnerCompleted
                        |> fun y -> innerSub <- Some y
            source
            |> subscribeSafeWithCallbacks
                onOuterNext o.OnError o.OnCompleted)

However, there are two problems. a. directly use this operator does not fit requirements above, skipped commands will be silently ignored. We can modify source code a litter to fit the requirements, but there is still another problem b. the implementation introduced two local mutable variables, and two nested subscription. I don't know if this is ok in all cases (will there be risk of data racing?), and prefer solutions based on composition of operators other than mutable references

  1. SodiumFRP provided forward references types StreamLoop and CellLoop. And by the Functional Reactive Programming book, Rx alternative for these forward references types would be Subject, by using Subject the recursive construct above is separated into two phases. The problem is by Intro to Rx indicated, using Subject requires manually manage more state, at least dispose of the subject is required, and maybe forced to hot observables. I'm wondering if there exists solutions without using Subject

  2. Using window operator with boundary on last value (just before finish) on RunCommand results, the processes above can some how be constructed, but this solution need to use ending signal twice, which requires careful treatment (quiet a while on trying and tuning Take(1), zip, withLatestFrom, combineLatest, overloads of Window operators to get desired result) on simultaneous events.

Are there better solutions or modification to solutions above to this problem, especially only use operators?

1

There are 1 answers

4
Shlomo On

Your types are all weird and hard to work with. Your question is fundamentally a simple state machine with two triggers: 1) New command arrives, 2) Previous command finishes executing.

This should get you started:

void Main()
{
    IObservable<ICommand> source = new Subject<ICommand>();
    var executionTerminatedLoopback = new Subject<Unit>();
    
    var stateMachine = source
        .Select(c => (command: c, type: 1))
        .Merge(executionTerminatedLoopback.Select(_ => (command: (ICommand)null, type: 2)))
        .Scan((isExecuting: false, validCommand: (ICommand)null, failedCommand: (ICommand)null), (state, msg) => {
            if(msg.type == 2)
                return (false, null, null);
            if(state.isExecuting)
                return (true, null, msg.command);
            else
                return (true, msg.command, null);
        });
    
    var validCommands = stateMachine.Where(t => t.validCommand != null).Select(t => t.validCommand);
    var failedCommands = stateMachine.Where(t => t.failedCommand != null).Select(t => t.failedCommand);
    
    validCommands.SelectMany(c => c.Execute()).Subscribe(executionTerminatedLoopback);
}

public interface ICommand{ 
    IObservable<Unit> Execute();
}

source here doesn't have to be a subject, and probably shouldn't be: It can be any IObservable<ICommand>. It's just easier to mock up in an answer. The exeuctionTerminatedLoopback though does have to be, to (as you put it) break up the recursion into two parts. Because it's a subject, it should be kept private and not leaked out.

I don't think there are any answers in C# without using Subject.