IObservable - Replacing AutoResetEvent

442 views Asked by At

Just wondering how I can replace the AutoResetEvent in the below? I was trying to think how to do it the RX way or with tasks, but I can see how to do it.

public void LogOnResponse LogOn()
{

    LogOnResponse logOnResponse = null;
    var waitEvent = new AutoResetEvent(false);

    var listener = _connection.LoginStatusEvent
                        .Where(x => x.LoginState == LoginState.LoggedOn 
                                    || x.LoginState == LoginState.LoggedRejected);

    listener.Subscribe(x => {
                            logOnResponse = new LogOnResponse();
                            logOnResponse.InformationMessage = x.Message;
                            logOnResponse.IsAuthenticated = x.LoginState == LoginState.LoggedOn;
                            waitEvent.Set();
                        });

    connection.Login(connectionInfo);

    waitEvent.WaitOne(2000);

    return logOnResponse;
}
2

There are 2 answers

6
Scott Weinstein On BEST ANSWER

Consider structuring the code like so:

var listener = _velocityConnection.LoginStatusEvent
    .Where(x => x.LoginState == LoginState.LoggedOn || x.LoginState == LoginState.LoggedRejected);

var logOnResponse = listener.Select(x =>
    new LogOnResponse() {
        logOnResponse.InformationMessage = x.Message;
        logOnResponse.IsAuthenticated = x.LoginState == LoginState.LoggedOn;
    }
).Timeout(TimeSpan.FromSeconds(2), Observable.Return(new LogOnResponse() {...}))
    .Publish()
    .RefCount();

connection.Login(connectionInfo);
return logOnResponse.First();

We use Publish to so that order of execution doesn't matter, and RefCount() to hide the ConnectableObservable

0
James Hay On

I'd have the method return an IObservable<LogOnResponse> so you don't need to wait until your ready to return. You'll also have to watch out for race conditions as your call to Login could complete before you have had a chance to subscribe. Something like this should handle the race conditions and give you back a response through the returned IObservable. You can then obviously subscribe to this to get the LogOnResponse as soon as it turns up.

public IObservable<LogOnResponse> LogOn()
{
    return Observable.CreateWithDisposable<LogOnResponse>(observer =>
    {
       var loginDisposable = _velocityConnection.LoginStatusEvent
                        .Where(x => x.LoginState == LoginState.LoggedOn 
                                    || x.LoginState == LoginState.LoggedRejected)
                        .Select(x => {
                            logOnResponse = new LogOnResponse();
                            logOnResponse.InformationMessage = x.Message;
                            logOnResponse.IsAuthenticated = x.LoginState == LoginState.LoggedOn;
                            return logOnResponse;
                        }).Take(1).Subscribe(observer);

       connection.Login(connectionInfo);

       return loginDisposable;
   });
}

One thing to note here is that calling this method only returns the IObservable. The actual login won't be called until you subscribe.

// Gets the IObservable<LogOnResult>
var logonResponseAsObservable = LogOn(); 
//Execute the logon and wait for a response asynchronously
logonResponseAsObservable.Subscribe(response => HandleLogOnResponse(response));