BlockingCollection.TryTake() exceeds timeout

2.4k views Asked by At

In my app, I have several threads for handling TCP connection (one for reading, one for sending, one for handling new incomming connections). Each thread handles the given type of operation for all clients, so lets say it sends data to 5 TcpClient instances on different IPs. I am usning a BlockingCollection as a buffer, since I am accessing it from the sending thread, but also from another thread that generates the data to be sent. My function that is run in the sending thread looks like this:

    private void Sender()
    {
        while (run)
        {
            List<object[]> toRemove = new List<object[]>(); //bypass the fact that I cant remove a foreach iteration item from the BlockingCollection
            foreach (object[] obj in sendBuffer.ToList())
            {
                string IP = obj[0].ToString();
                string msg = obj[1].ToString();
                byte[] data = Encoding.UTF8.GetBytes(msg);
                foreach (TcpClient tcpc in TcpClients)
                {
                    if ((tcpc.Client.RemoteEndPoint as IPEndPoint).Address.ToString() == IP)
                    {
                        NetworkStream stream = tcpc.GetStream();
                        stream.Write(data, 0, data.Length);
                        break;
                    }
                }
                toRemove.Add(obj);
            }
            for (int i = 0; i < toRemove.Count; i++) //the bypass mentioned above
            {
                object[] rmv = toRemove[i];
                sendBuffer.TryTake(out rmv);
            }
        }
    }

Note: the BlockingCollection used is type <object[]>. My problem is that at some point of traffic, the buffer starts to fill up. I have set a limit of maximally 500 messages in the buffer, and it overflows this easily. Now, if I understand it correctly (not sure), what TryTake does is it tries to remove the item, and if the collection is being used at the moment, it waits and tries again. (Note: I have also tried setting a timeout to 50 milliseconds). If this is true (if not, someone please correct me and suggest a different reason), the problem is probably that the collection is busy most of the time the TryTake is called. Could it be that? And if yes, how to solve it?

As for the collection usage, by the thread that generates the data, the collection is accessed about once per 2 seconds in a foreach that iterates through a range of 1-80 items. The buffer starts having problem at about 20+ items, until then, its fine. The sender threads now only sends to one client only, later it would be up to 15. So in the peak, that would be 80 items x 15 users = roughly 1200 accesses per ~2 seconds. Any advice greatly appreciated, thank you.

1

There are 1 answers

0
Dave Manning On

TryTake does not operate like you have described, the default BlockingCollection uses a ConcurrentQueue to store items and TryTake will assign the next item in the queue to the out reference provided.

For example

BlockingCollection<object[]> sendBuffer = new BlockingCollection<object[]>();

object[] message = new object[2];
object[] message2 = new object[2];
// Add messages to the queue
sendBuffer.Add(message);
sendBuffer.Add(message2);

object[] toSend;
// Take next message off the queue
sendBuffer.TryTake(out toSend);

// toSend === message

In your situation you can use BlockingCollection.Take() to wait for messages to send:

BlockingCollection<object[]> sendBuffer = new BlockingCollection<object[]>();
// CancellationTokenSource is used in place of your run variable.
System.Threading.CancellationTokenSource cancellationSource 
= new System.Threading.CancellationTokenSource();

    private void Sender()
    {
        // Exit loop if cancellation requested
        while (!cancellationSource.Token.IsCancellationRequested)
        {

            object[] obj;

            try {
                // Blocks until an item is available in sendBuffer or cancellationSource.Cancel() is called.
                obj = sendBuffer.Take(cancellationSource.Token);
            } catch (OperationCanceledException) {
                // OperationCanceledException will be thrown if cancellationSource.Cancel() 
                // is called during call to sendBuffer.Take
                break;
            } catch (InvalidOperationException) {
                // An InvalidOperationException means that Take() was called on a completed collection.
                // See BlockingCollection<T>.CompleteAdding
                break;
            }

            string IP = obj[0].ToString();
            string msg = obj[1].ToString();
            byte[] data = Encoding.UTF8.GetBytes(msg);
            foreach (TcpClient tcpc in TcpClients) {
                if ((tcpc.Client.RemoteEndPoint as IPEndPoint).Address.ToString() == IP) {
                    NetworkStream stream = tcpc.GetStream();
                    stream.Write(data, 0, data.Length);
                    break;
                }
            }               
        }
    }