Publish/Consume: Wait on Subscribe, filter messages and Dispose

166 views Asked by At

Using Rx.Net 3

With use of the Quartz.Net Scheduler I build a workflow manager to chain jobs (using Quartz Joblistener on finished jobs) with an embedded web server. The application instantiates an instance of a Subject (singleton).

A web service takes data and starts a workflow, injecting a unique ID. This unique ID is propagated through the workflow. The Joblistener delegate detects the end of a specific job and calls OnNext on the injected Subject instance with a Type holding the unique ID and an DB table ID.

The idea was that the web service on each call subscribes to the Subject and waits for incoming messages/events and filters them on the unique ID. When found disposes the subscription, collects and returns the generated data to the caller.

How can I make my Subscribe() wait for incoming messages, filter them and Dispose(), without finishing the web service prematurely.

2

There are 2 answers

0
Fritz Herbers On BEST ANSWER
// model
public class AsyncCommunicationObject
{
    public string Key { get; }
    public string Value { get; }

    public AsyncCommunicationObject(string key, string value)
    {
        Key = key;
        Value = value;
    }
}

// injectable singleton 
public static Subject<AsyncCommunicationObject> AsyncCommunication { get; set; } = new Subject<AsyncCommunicationObject>();

// in web service           
System.Threading.EventWaitHandle waitHandle = new System.Threading.AutoResetEvent(false);
string yourID = some ID

var subscription = _asyncCommunication (injected)
    .Where(x => x.Key == yourID)
    .Take(1)
    .Subscribe(
        x =>
        {
            dbId = x.Value;
            waitHandle.Set();
        }
    );

_schedulerCore.ExecuteJob(upload.JobId, jobDataMap);

waitHandle.WaitOne();
waitHandle.Reset();

subscription.Dispose();

// in job listener
_asyncCommunication.OnNext(new AsyncCommunicationObject(your ID, some value)
1
Asti On

You don't need to dispose of the subscription manually. Any bounding operator like Take or First signals OnCompleted which causes the sequence to be disposed. You can also use await observables to make avoid writing callbacks.

For example, dbId = await AsyncCommunication.FirstAsync(x => x.Key == id)