Consuming an IAsyncEnumerable that makes an async call to another service or API and the response returns in batches

1k views Asked by At

I am trying to create an API that returns a result as soon as it is ready. The goal is:

  1. An API endpoint that yields a result as soon as it is ready
  2. A consuming app that calls the endpoint and process the data as soon as it is ready

My API calls another service as such:

[HttpGet]
[Route("stream")]
public async IAsyncEnumerable<ReturnModel> GetStream()
{        
    // I'm declaring a separate variable for the request message as I'm also using this for Post.
    var request = new HttpRequestMessage
    {
        Method = HttpMethod.Get,
        RequestUri = new Uri(the_url_that_allowed_streaming_of_data)
    };
                
    var client = new HttpClient();
    client.DefaultRequestHeaders.Add("Accept", "application/json"); // needed for Post
    
    var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
    
    var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
    using var reader = new StreamReader(stream);
    while (!reader.EndOfStream)
    {                
        var message = await reader.ReadLineAsync();
        Console.WriteLine(message); // used to check if I'm getting the data fast enough.
        
        // Do something with the message here.
        
        yield return new ReturnModel { ... };
    }
}

If I try to hit the URL (stream) on a browser, the responses are written out on the page by batch (multiple yield instead of per yield).

I consume that in a Console app with the code written below showing the same behavior as calling it in the browser. The results are in batches.

static async Task ConsumeStream(string url)
{
    var client = new HttpClient();
        
    var request = new HttpRequestMessage
    {
        Method = HttpMethod.Get,
        RequestUri = new Uri(url),
    };

    var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
    var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
    IAsyncEnumerable<ReturnModel> items = System.Text.Json.JsonSerializer
        .DeserializeAsyncEnumerable<ReturnModel>(
            stream,
            new JsonSerializerOptions
            {
                PropertyNameCaseInsensitive = true,
                DefaultBufferSize = 10 // no matter what value I put here, the result is the same
            });

    var lineCtr = 0;
    await foreach (var item in items)
    {
        lineCtr++;
        Console.WriteLine($"Line: {lineCtr} ---- Time: {DateTime.Now.ToLongTimeString()} ---- Content: {item.Content}"); // this does not write out immediately
    }   
}

I tried this other approach which is worse as it seems to wait for the entire data before it process them - write those out on the console.

// All the Console.WriteLine is shown in the window at the same time.
static async Task ConsumeStreamV2(string url)
{
    var client = new HttpClient();
        
    var request = new HttpRequestMessage
    {
        Method = HttpMethod.Get,
        RequestUri = new Uri(url),
    };

    var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
    var stream = await response.Content.ReadAsStreamAsync();
    
    using var reader = new StreamReader(stream);
    while (!reader.EndOfStream)
    {
        var message = await reader.ReadLineAsync();
        Console.WriteLine($"Time: {DateTime.Now.ToLongTimeString()} ---- Content: {message}");
    }

}

What am I missing?

*I am using .Net Core 6.0

1

There are 1 answers

0
s_v On

I was facing the same issue, I think I found some reasoning behind it. It depends on when the System.Text.Json library flushes the reponse body when serializing an IAsyncEnumerable. It does not flush the response body after every step of async enumeration, rather it flushes at buffer size limit that's internal to the JSON serializer.

On their page the mentioned default buffer size, in bytes, is 16,384.