Unexpected behavior using ClientWebSocket SendAsync and ReceiveAsync methods simultaniously in different tasks (.Net Framework)

245 views Asked by At

I have my custom WebSocket client (using System.Net.WebSockets library) that works pretty well. Internally, library has 2 separate Tasks: first task is sending commands to the web socket server, second task is receiving responses from the server. Inside of each task I use "infinite loop" approach (because they works in background) like:

SendTask:

// Task that sends command through the web socket and write them to response queue.
// Implementation of responseQueue & commandQueue are below. 
// CommandQueue has some items at the beggining.
while (webSocket.State != WebSocketState.Closed)
{
  var command = await commandQueue.ReadAsync(cts);
  await webSocket.SendAsync(bytesToSend, WebSocketMessageType.Text, true, CancellationToken.None);
  //in order to log command sent, push it into response queue.
  responseQueue.Push(command);
}

ReceiveTask:

//Task that receive responses from the same web socket and push them into response queue as well.

//these variables needs for a multipacket responses
int dataPerPacket = 8 * 1024; //8192
StringBuilder strBuilder = new StringBuilder();
var buffer = new ArraySegment<byte>(new byte[dataPerPacket]);

while (webSocket.State != WebSocketState.Closed)
{
  var receiveResult = await webSocket.ReceiveAsync(buffer, recCts);

  // receive message
  strBuilder.Append(Encoding.UTF8.GetString(buffer.Array, buffer.Offset, receiveResult.Count));
  //keep doing until end of message
  while (!receiveResult.EndOfMessage)
  {
      receiveResult = await webSocket.ReceiveAsync(buffer, recCts);
      strBuilder.Append(Encoding.UTF8.GetString(buffer.Array, buffer.Offset, receiveResult.Count));
  }

  responseQueue.Push(strBuilder.ToString());
  strBuilder.Clear();
}

I simplicated my code, but nothing valuable didn't lose. The websocket is just open echo server on the internet.

So, for the command and response queues I am using Channel approach and have implemented my own custom class (because I am on .NET Framework 4.5.2) .The intension of this class is to be able to produce (or write) data to the responseQueue in the SendTask and consume (or read) data from the responseQueue in the ReceiveTask efficiently, that is, without blocking (and avoid CPU consuming due to infinite loops).

Implementation of Channel:

public class Channel
{
        private ConcurrentQueue<string> _queue;
        private SemaphoreSlim _flag;

        public Channel()
        {
            _queue = new ConcurrentQueue<string>();
            _flag = new SemaphoreSlim(0);
        }
        
        public async Task<string> ReadAsync(CancellationToken readCancelToken)
        {
            await _flag.WaitAsync(readCancelToken);
            if (readCancelToken.IsCancellationRequested)
            {
                return default;
            }

            if (_queue.TryDequeue(out item))
            {
                return item;
            }

            return default;
        }

        public void Push(string item)
        {
            _queue.Enqueue(item);
            _flag.Release();
        }
    }

But, for some unclear reason I have an unexpected behavior when it works. I expect that order of pushing into responseQueue will be as the following:

command 1, command 2, response 1, command 3, response 2,...

that is, async behaviour. While the succeeding SendAsync is executing, we are receiving response and push the response in the queue as well. But in fact I am getting the following order:

command 1, command 2,.....command 100, response 1, response 2, .... 

So my question why? For what reason? How can I achieve this?

I'm sure that the reason is not due to Channel's implementation.

What I have done to figure out:

  1. If I add some time delay into SendTask (at least await Task.Delay(1)) I am getting the order that is expected, that is async behaviour.
  2. I use Stopwatch class to measure elapsed time for calling SendAsync and it showed me 0 millisecond. It's strange output, but I don't know how to explain this behaviour.
  3. Take a look at the source WebSocketClient code to make sure that SendAsync should take some time (in my understanding).
1

There are 1 answers

3
Gustavo F On

Based on what you informed, the problem is:

In your ReceiveTask, there is no command to Enter in the semaphore only to Release. In order to use a semaphore, all tasks have to request to enter and inform the release.

It works when you do await Task.Delay(1) because it gives time to ReceiveTask to Release.

Here is an example simulating the network SendAsync and ReceiveAsync operations:

using System.Text;

class q77091244
{
    static void Main(string[] args)
    {
        //Channel commandQueue = new Channel("command");       
        Channel responseQueue = new Channel();  

        // SendTask
        Task SendTask = Task.Run(async () => 
        {
            Console.WriteLine("SendTask started...");
            for (int i = 0; i < 10; i++)
            {
                CancellationToken cts = new CancellationToken();
                Console.WriteLine("SendTask Waiting");
                var command = await responseQueue.ReadAsync(cts);
                Console.WriteLine("SendTask Sending");
                await Task.Delay(100); //simulates await webSocket.SendAsync...
                //in order to log command sent, push it into response queue.
                Console.WriteLine("SendTask Push");
                responseQueue.Push(command);
            }
        });

        Task ReceiveTask = Task.Run(async () => 
        {
            Console.WriteLine("ReceiveTask started...");
            StringBuilder strBuilder = new StringBuilder();
            var buffer = Encoding.UTF8.GetBytes("item x");

            for (int i = 0; i < 10; i++)
            {
                CancellationToken cts = new CancellationToken();
                Console.WriteLine("ReceiveTask Waiting");
                await responseQueue.WaitAsync(cts);                
                // receive message
                strBuilder.Append(Encoding.UTF8.GetString(buffer, 0, buffer.Length));
                Console.WriteLine("ReceiveTask Receiving");
                await Task.Delay(150); //simultates await webSocket.ReceiveAsync(buffer, recCts);
                Console.WriteLine("ReceiveTask Push");
                responseQueue.Push(strBuilder.ToString());
                strBuilder.Clear();
            }
        });

        Thread.Sleep(10000);

    }
}

The Channel is the same, but initializes SemaphoreSlim with 1 to start in a green state (more info) and WaitAsync without Dequeue operation:

using System.Collections.Concurrent;

public class Channel
{
        private ConcurrentQueue<string> _queue;
        private SemaphoreSlim _flag;

        public Channel()
        {
            _queue = new ConcurrentQueue<string>();
            _flag = new SemaphoreSlim(1);
        }
        
        public async Task WaitAsync(CancellationToken readCancelToken)
        {
            await _flag.WaitAsync(readCancelToken);
        }

        public async Task<string> ReadAsync(CancellationToken readCancelToken)
        {          
            await _flag.WaitAsync(readCancelToken);
            if (readCancelToken.IsCancellationRequested)
            {
                return default;
            }

            string item;
            if (_queue.TryDequeue(out item))
            {
                Console.WriteLine("dequed " + item);
                return item;
            }

            return default;
        }

        public void Push(string item)
        {
            Console.WriteLine("enqueue " + item);
            _queue.Enqueue(item);
            _flag.Release();
        }
    }

Output:

SendTask started...
SendTask Waiting
ReceiveTask started...
SendTask Sending
ReceiveTask Waiting
SendTask Push
enqueue 
SendTask Waiting
ReceiveTask Receiving
ReceiveTask Push
enqueue item x
ReceiveTask Waiting
dequed
SendTask Sending
SendTask Push
enqueue 
SendTask Waiting
ReceiveTask Receiving
ReceiveTask Push
enqueue item x
ReceiveTask Waiting
dequed item x
SendTask Sending
SendTask Push
enqueue item x
SendTask Waiting
ReceiveTask Receiving
ReceiveTask Push
enqueue item x
ReceiveTask Waiting
dequed
SendTask Sending
SendTask Push
enqueue