class SomeMediator
{
public string[] Inputs { get; private set; }
public ISubject<string> Response { get; private set; }
public SomeMediator(string[] inputs)
{
Inputs = inputs;
Response = new Subject<string>();
}
}
class SomeProcessor
{
public SomeProcessor(SomeMediator mediator)
{
foreach (var input in mediator.Inputs)
{
var clock = new Timer();
clock.Interval = 1000;
clock.Enabled = true;
clock.Elapsed += new ElapsedEventHandler(
(object source, ElapsedEventArgs e) =>
mediator.Response.OnNext(string.Empty));
}
}
}
class SomeRenderer
{
public SomeRenderer(SomeMediator mediator)
{
mediator.Subscribe(o => Console.WriteLine(o));
}
}
Usage
var mediator = new SomeMediator(new[] { "Input#1", "Input#2" });
var subscriber = SomeRenderer(mediator);
var processor = SomeProcessor(mediator);
The code above injects SomeMediator
with 2 input parameters into SomeProcessor
and subscribes SomeRenderer
to all responses.
Based on 2 inputs, SomeProcessor
creates 2 Timer
threads and starts sending notifications to SomeMediator
.
Each Timer
asynchronously calls SomeMediator.Response.OnNext
and I'm getting a warning below
Warning: 0 : OnNext called while another OnNext call was in progress on the same Observer
As a result, the second OnNext
message gets lost, so SomeMediator
receives only 1 response from SomeProcessor
instead of 2.
The question
Is it possible to implement mediator interaction through observable and prevent OnNext
conflicts caused by different threads?
Update - Example
Let's say, there is a stock trading app that has some abstract generic Core
functionality that can be reused by other apps. This core can include some market data formatter (mediator) to turn API response of various formats into a unified model (response) and visualize it (renderer). The response is supposed to be received from different gateways (processors) that will have specific implementation and will depend on the API provided by the brokerage firm.
In other words, we have multiple processors, like SomeProcessor1
, SomeProcessor2
, SomeProcessor3
that will act as API wrappers. They accept some input parameters for the HTTP request (inputs) and return a response. Inputs are used by API processors. Mediator only gets the result and sends it to the visualizer. Mediator has Subject
that is passed to each Processor
and allows Renderer
to subscribe to these changes. Every time some processor sends an update, the renderer should receive this notification through the mediator and show it in UI.
Processor1 Processor2 Processor3 (API)
\ | /
Mediator (Aggregate and Format)
/ \
Logger Renderer (Subscribed tasks - Log and Display)