Join on PortSet's in CCR

159 views Asked by At

I have 4 services, each has a method with the signature like this:

PortSet<Response1, Exception> GetData1(Request1 request);
PortSet<Response2, Exception> GetData2(Request2 request);
PortSet<Response3, Exception> GetData3(Request3 request);
PortSet<Response4, Exception> GetData4(Request4 request);

I need to run them concurrently and join the final result, processing result and exception that comes from each port separately. Could you please suggest how do I do this?

I was able to find only possibility to join the results from the Port's, not from PortSet's.

Thanks

1

There are 1 answers

1
spender On BEST ANSWER

Your solution is somewhat limited by the number of classes involved. I'd suggest activating Choice receivers on each PortSet, and in each handler to Post to a completion port. On the completion port, you could use a Join when they all complete.So, off the top of my head, and assuming you are deriving from CcrServiceBase (otherwise you'll need to use Arbiter.Activate instead of the terser Activate):

var cPort=new Port<EmptyValue>();
Activate(GetData1(myRequest1)
    .Choice(r1=>{
        Process(r1);
        cPort.Post(EmptyValue.SharedInstance);
    },ex=>{
        Process(ex);
        cPort.Post(EmptyValue.SharedInstance);
    }); //etc 4 times

Activate(cPort.Join(4,e=>{
    //all completed. Proceed here...
}));

If instead you had a common Response type, you could instead structure your calls as follows:

var PortSet<Response,Exception> ps=new PortSet<Response,Exception>();
GetData1(request1,ps);
GetData2(request2,ps);
GetData3(request3,ps);
GetData4(request4,ps);

so instead of the GetData call creating a new PortSet, you provide a common PortSet and supply it to the GetData methods.

Now, you can perform a multiple item receive:

ps.MultipleItemReceive(4,
    responses=> {
        foreach(var response in responses)
        {
            //process response
        }
    },
    exceptions=> {
        foreach(var exception in exceptions)
        {
            //process exception
        }
    })