Observable mediator with Rx.NET

754 views Asked by At
class SomeMediator
{
  public string[] Inputs { get; private set; }
  public ISubject<string> Response { get; private set; }
  
  public SomeMediator(string[] inputs)
  {
    Inputs = inputs;
    Response = new Subject<string>();
  }
}

class SomeProcessor
{
  public SomeProcessor(SomeMediator mediator)
  {
    foreach (var input in mediator.Inputs) 
    {
      var clock = new Timer();
      
      clock.Interval = 1000;
      clock.Enabled = true;
      clock.Elapsed += new ElapsedEventHandler(
        (object source, ElapsedEventArgs e) => 
          mediator.Response.OnNext(string.Empty));
    }
  }
}

class SomeRenderer 
{
  public SomeRenderer(SomeMediator mediator)
  {
    mediator.Subscribe(o => Console.WriteLine(o));
  }
}

Usage

var mediator = new SomeMediator(new[] { "Input#1", "Input#2" });
var subscriber = SomeRenderer(mediator);
var processor = SomeProcessor(mediator);

The code above injects SomeMediator with 2 input parameters into SomeProcessor and subscribes SomeRenderer to all responses. Based on 2 inputs, SomeProcessor creates 2 Timer threads and starts sending notifications to SomeMediator. Each Timer asynchronously calls SomeMediator.Response.OnNext and I'm getting a warning below

Warning: 0 : OnNext called while another OnNext call was in progress on the same Observer

As a result, the second OnNext message gets lost, so SomeMediator receives only 1 response from SomeProcessor instead of 2.

The question

Is it possible to implement mediator interaction through observable and prevent OnNext conflicts caused by different threads?

Update - Example

Let's say, there is a stock trading app that has some abstract generic Core functionality that can be reused by other apps. This core can include some market data formatter (mediator) to turn API response of various formats into a unified model (response) and visualize it (renderer). The response is supposed to be received from different gateways (processors) that will have specific implementation and will depend on the API provided by the brokerage firm.

In other words, we have multiple processors, like SomeProcessor1, SomeProcessor2, SomeProcessor3 that will act as API wrappers. They accept some input parameters for the HTTP request (inputs) and return a response. Inputs are used by API processors. Mediator only gets the result and sends it to the visualizer. Mediator has Subject that is passed to each Processor and allows Renderer to subscribe to these changes. Every time some processor sends an update, the renderer should receive this notification through the mediator and show it in UI.

Processor1      Processor2     Processor3        (API)
            \       |        /
                 Mediator                        (Aggregate and Format)
               /         \
         Logger           Renderer               (Subscribed tasks - Log and Display)
0

There are 0 answers