How to know when a fetch ends without locking the body stream?

964 views Asked by At

I'm making requests to an API, but their server only allows a certain number of active connections, so I would like to limit the number of ongoing fetches. For my purposes, a fetch is only completed (not ongoing) when the HTTP response body arrives at the client.

I would like to create an abstraction like this:

const fetchLimiter = new FetchLimiter(maxConnections);
fetchLimiter.fetch(url, options); // Returns the same thing as fetch()

This would make things a lot simpler, but there seems to be no way of knowing when a stream being used by other code ends, because streams are locked while they are being read. It is possible to use ReadableStream.tee() to split the stream into two, use one and return the other to the caller (possibly also constructing a Response with it), but this would degrade performance, right?

1

There are 1 answers

2
David784 On

Since fetch uses promises, you can take advantage of that to make a simple queue system.

This is a method I've used before for queuing promise based stuff. It enqueues items by creating a Promise and then adding its resolver to an array. Of course until that Promise resolves, the await keeps any later promises from being invoked.

And all we have to do to start the next fetch when one finishes is just grab the next resolver and invoke it. The promise resolves, and then the fetch starts!

Best part, since we don't actually consume the fetch result, there's no worries about having to clone or anything...we just pass it on intact, so that you can consume it in a later then or something.

*Edit: since the body is still streaming after the fetch promise resolves, I added a third option so that you can pass in the body type, and have FetchLimiter retrieve and parse the body for you.

These all return a promise that is eventually resolved with the actual content.

That way you can just have FetchLimiter parse the body for you. I made it so it would return an array of [response, data], that way you can still check things like the response code, headers, etc.

For that matter, you could even pass in a callback or something to use in that await if you needed to do something more complex, like different methods of parsing the body depending on response code.

Example

I added comments to indicate where the FetchLimiter code begins and ends...the rest is just demo code.

It's using a fake fetch using a setTimeout, which will resolve between 0.5-1.5 secs. It will start the first three requests immediately, and then the actives will be full, and it will wait for one to resolve.

When that happens, you'll see the comment that the promise has resolved, then the next promise in the queue will start, and then you'll see the then from in the for loop resolve. I added that then just so you could see the order of events.

(function() {
  const fetch = (resource, init) => new Promise((resolve, reject) => {
    console.log('starting ' + resource);
    setTimeout(() => {
      console.log(' - resolving ' + resource);
      resolve(resource);
    }, 500 + 1000 * Math.random());
  });

  function FetchLimiter() {
    this.queue = [];
    this.active = 0;
    this.maxActive = 3;
    this.fetchFn = fetch;
  }
  FetchLimiter.prototype.fetch = async function(resource, init, respType) {
    // if at max active, enqueue the next request by adding a promise
    // ahead of it, and putting the resolver in the "queue" array.
    if (this.active >= this.maxActive) {
      await new Promise(resolve => {
        this.queue.push(resolve); // push, adds to end of array
      });
    }
    this.active++; // increment active once we're about to start the fetch
    const resp = await this.fetchFn(resource, init);
    let data;
    if (['arrayBuffer', 'blob', 'json', 'text', 'formData'].indexOf(respType) >= 0)
      data = await resp[respType]();
    this.active--; // decrement active once fetch is done
    this.checkQueue(); // time to start the next fetch from queue
    return [resp, data]; // return value from fetch
  };

  FetchLimiter.prototype.checkQueue = function() {
    if (this.active < this.maxActive && this.queue.length) {
      // shfit, pulls from start of array. This gives first in, first out.
      const next = this.queue.shift();
      next('resolved'); // resolve promise, value doesn't matter
    }
  }

  const limiter = new FetchLimiter();
  for (let i = 0; i < 9; i++) {
    limiter.fetch('/mypage/' + i)
      .then(x => console.log(' - .then ' + x));
  }
})();

Caveats:

  • I'm not 100% sure if the body is still streaming when the promise resolves...that seems to be a concern for you. However if that's a problem you could use one of the Body mixin methods like blob or text or json, which doesn't resolve until the body content is completely parsed (see here)

  • I intentionally kept it very short (like 15 lines of actual code) as a very simple proof of concept. You'd want to add some error handling in production code, so that if the fetch rejects because of a connection error or something that you still decrement the active counter and start the next fetch.

  • Of course it's also using async/await syntax, because it's so much easier to read. If you need to support older browsers, you'd want to rewrite with Promises or transpile with babel or equivalent.