How to clear the outgoing message buffers at the Server?

1.2k views Asked by At

I've written a service using PollingDuplexHttpBinding which has a Silverllight client that consumes it. My Service basically has a given number of clients sending their data (quite often, every second, and the data is quite large, each call is around 5KB) to the service, and also listening for new data sent by other clients to the service to be routed to them, very similar to a chat room architecture.

The problem I'm noticing is that when clients connect to the service over the internet, after a few minutes the service's response becomes slow and the replies become lagged. I've come to the conclusion that when the service hosts upload capacity is reached (internet upload speed, on the server its about 15KB/s only), the messages sent by other clients are buffered and processed accordingly when there is bandwidth available. I'm wondering how exactly can I limit the seize of this buffer that the service uses to store the received messages from the clients? It's not so critical that my clients get all the data, but rather that they get the latest data sent by others, so real time connectivity is what i'm looking for at the cost of guaranteed delivery.

In short I want to be able to clean my queue/buffer at the service whenever it fills up, or a certain cap is reached and start filling it again with the received calls to get rid of the delay. How do I do this? Is the MaxBufferSize property what I need to decrease on the service side as well as on the client side? Or do I need to code this functionality in my service? Any ideas?

Thanks.

EDIT:

Here is my service architecture:

//the service
[ServiceContract(Namespace = "", CallbackContract = typeof(INewsNotification))]
[AspNetCompatibilityRequirements(RequirementsMode = AspNetCompatibilityRequirementsMode.Allowed)]
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Multiple, InstanceContextMode = InstanceContextMode.Single)]
public class NewsService
{


private static Dictionary<IChatNotification, string> clients = new Dictionary<IChatNotification, string>();
private ReaderWriterLockSlim subscribersLock = new ReaderWriterLockSlim();

[OperationContract(IsOneWay = true)]
public void PublishNotifications(byte[] data)
{
            try
            {
                subscribersLock.EnterReadLock();
                List<INewsNotification> removeList = new List<INewsNotification>();
                lock (clients)
                {
                    foreach (var subscriber in clients)
                    {
                        if (OperationContext.Current.GetCallbackChannel<IChatNotification>() == subscriber.Key)
                        {
                            continue;
                        }
                        try
                        {
                            subscriber.Key.BeginOnNotificationSend(data, GetCurrentUser(), onNotifyCompletedNotificationSend, subscriber.Key);

                        }
                        catch (CommunicationObjectAbortedException)
                        {
                            removeList.Add(subscriber.Key);
                        }
                        catch (CommunicationException)
                        {
                            removeList.Add(subscriber.Key);
                        }
                        catch (ObjectDisposedException)
                        {
                            removeList.Add(subscriber.Key);
                        }

                    }
                }

                foreach (var item in removeList)
                {
                    clients.Remove(item);
                }
            }
            finally
            {
                subscribersLock.ExitReadLock();
            }
        }

}


//the callback contract
[ServiceContract]
public interface INewsNotification
{
       [OperationContract(IsOneWay = true, AsyncPattern = true)]
       IAsyncResult BeginOnNotificationSend(byte[] data, string username, AsyncCallback callback, object asyncState);
       void EndOnNotificationSend(IAsyncResult result);
}

The service config:

  <system.serviceModel>
    <extensions>
      <bindingExtensions>
        <add name="pollingDuplex" type="System.ServiceModel.Configuration.PollingDuplexHttpBindingCollectionElement, System.ServiceModel.PollingDuplex, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35" />
      </bindingExtensions>
    </extensions>
    <behaviors>
      <serviceBehaviors>
        <behavior name="">

          <serviceMetadata httpGetEnabled="true" />
          <serviceThrottling maxConcurrentSessions="2147483647" />
          <serviceDebug includeExceptionDetailInFaults="true" />
        </behavior>
      </serviceBehaviors>
    </behaviors>
    <bindings>
      <pollingDuplex>

        <binding name="myPollingDuplex" duplexMode="SingleMessagePerPoll" 
                 maxOutputDelay="00:00:00" inactivityTimeout="02:00:00" 
                 serverPollTimeout="00:55:00" sendTimeout="02:00:00"  openTimeout="02:00:00" 
                  maxBufferSize="10000"  maxReceivedMessageSize="10000" maxBufferPoolSize="1000"/>

      </pollingDuplex>
    </bindings>
    <serviceHostingEnvironment aspNetCompatibilityEnabled="true" multipleSiteBindingsEnabled="true" />
    <services>
      <service name="NewsNotificationService.Web.NewsService">
        <endpoint address="" binding="pollingDuplex" bindingConfiguration="myPollingDuplex" contract="NewsNotificationService.Web.NewsService" />
        <endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange" />
      </service>
    </services>
  </system.serviceModel>
    <system.webServer>
        <directoryBrowse enabled="true" />
    </system.webServer>
</configuration>

The client will call the service typically between 500ms-1000ms periods like this:

_client.PublishNotificationAsync(byte[] data);

and the callback will notify the client of notifications sent by other clients:

void client_NotifyNewsReceived(object sender, NewsServiceProxy.OnNewsSendReceivedEventArgs e)
        {
                e.Usernamer//WHich client published the data
                e.data//contents of the notification
        }

So to recap, when the number of clienst increase, and the service hosts upload speed over the internet is limited, the messages sent out by the service to the subscribers get buffered somewhere and get processed in a queue, which is whats causing the problem, I don't know where these messages are buffered. In a LAN the service works fine because the server has an upload speed equal to the its download speed (for 100KB/s of incoming calls, it sends out 100KB/s of notifications). Where are these messages getting buffered? And how can I clear this buffer?

I did something experimental to try and see if the messages are buffered at the service, I tried calling this method on the client, but it always returns 0, even when one client is still in the process of receiving notifications that somebody else sent 4-5 minutes ago:

[OperationContract(IsOneWay = false)]
public int GetQueuedMessages()
{

            return OperationContext.Current.OutgoingMessageHeaders.Count();
}
2

There are 2 answers

2
wal On

MaxBufferSize isn't going to help you with this problem. You're going to have to code it yourself I dont know of any existing solution/framework. It does however sound like an interesting problem. You could start by maintaining a Queue<Message> for each connected client and when pushing to this queue (or when the client calls dequeue) you can re-evaluate what Message should be sent.

UPDATE: Firstly, I would forget about trying to do this from the client side and in the configuration, you're going to have to code this yourself

Here I can see where you are sending to your clients:

 subscriber.Key.BeginOnNotificationSend(data, GetCurrentUser(), onNotifyCompletedNotificationSend, subscriber.Key);

so instead of asynchronously pushing these notifications to each client you should push them into a Queue<byte[]>. Each client connection will have its own queue and you should probably construct a class dedicated to each client connection:

note this code won't compile out of the box and likely has some logic errors, use only as a guide

public class ClientConnection
{
    private INewsNotification _callback;
    private Queue<byte> _queue = new Queue<byte>();
    private object _lock = new object();
    private bool _isSending
    public ClientConnection(INewsNotification callBack)
    {
           _callback=callback;
    }
    public void Enqueue(byte[] message)
    { 
       lock(_lock)
       {               
           //what happens here depends on what you want to do. 
           //Do you want to only send the latest message? 
           //if yes, you can just clear out the queue and put the new message in there,                         
           //or you could keep the most recent 5 messages.                
           if(_queue.Count > 0)
               _queue.Clear();
          _queue.Enqueue(message);
           if(!_isSending)
               BeginSendMessage();
       }
    }

    private void BeginSendMessage()
    {
       _isSending=true;
        _callback.BeginOnNotificationSend(_queue.Dequeue(), GetCurrentUser(), EndCallbackClient, subscriber.Key);

    }

    private void EndCallbackClient(IAsyncResult ar)
    {
        _callback.EndReceive(ar);
        lock(_lock)
        {
           _isSending=false;
           if(_queue.Count > 0)
              BeginSendMessage();//more messages to send
        }
    }
}

Imagine a scenario where one message is pushed to the client and while sending 9 more messages call ClientConnection.Enqueue. When the first message has finished it checks the queue which should contain only the last (9th message)

7
Dmitry Harnitski On

I did some math for your situation.

  • message size = 100KB
  • upload channel = 15KB/s
  • Download channel = 100KB/s
  • Client call the service 1-2 times a second

client will call the service typically between 500ms-1000ms periods

Is this correct?

For one client your download traffic only for messages will be 100-200KB/s and this is only message body. There will be more with header and much more with security enabled.

Messages are going to be combined for asynchronous call. So if we have 3 client and every sent a message callback contains 2 messages for every client. 4 client - 3 messages in every callback.

For 3 clients it will be 200-400KB/s in download channel.

For me it looks like messages are too big for bandwidth you declared.

Check if you can:

  1. Reduce message size. I do not know nature of your business so cannot give advice here.

  2. Use compression for messages or traffic.

  3. Increase network bandwidth. Without that even ideal solution will have very high latency. You can spend days and weeks optimizing your code and even if you properly utilized your network solution is still going to be slow.

I know that sounds like Captain Obvious but some questions just have no right answer unless you change question.

After performing steps above work with ServiceThrottlingBehavior in combination with custom code that will manage callback queue.

ServiceThrottlingBehavior rejects requests if boundary is reached.

http://msdn.microsoft.com/en-us/library/ms735114(v=vs.100).aspx

This is trivial sample. Real numbers should be defined specifically for your environment.

<serviceThrottling 
 maxConcurrentCalls="1" 
 maxConcurrentSessions="1" 
 maxConcurrentInstances="1"
/>

Update:

Big mistake on my part in the question, each call is 5KB/s,

Even with 5KB messages 15KB/s is not enough. Let's calculate traffic for only 3 users. 3 Incoming message per second. System now should use duplex to notify users what other did send. Every user should get messages from his partners. We have 3 messages total. One (that belongs to sender) may be skipped so every user should get 2 messages. 3 users will get 10KB (5KB + 5KB = one message 10KB) = 30KB. One message per second will make 30KB/sec in upload channel for 3 users.

Where are these messages getting buffered? And how can I clear this buffer?

It depends on how do you host your service. If it is self-hosted service it is not buffered at all. You have your code that tries to send message to receiver but it goes very slow because the channel is flooded. With IIS there could be some buffering but it is not good practice to touch it from outside.

Right solution is throttling. With limitation in bandwidth you should not try to send all messages to all clients. Instead you should for example limit amount of threads that send messages. So from 3 users above instead of sending 3 messages in parallel you may send them sequentially or decide that only one user will get update in that round and next one in next round.

So general idea is not try to send everything to everybody as soon as you have data but send only amount of data you can afford and control that using threads count or response speed.