How to await an Array of async Tasks without blowing the stack?

141 views Asked by At

A large array of Tasks blows the stack if you want to await them all, even if the array fold is stack-safe, because it yields a large deferred function call tree:

const record = (type, o) =>
  (o[type.name || type] = type.name || type, o);

const thisify = f => f({});

const arrFold = f => init => xs => {
  let acc = init;
  
  for (let i = 0; i < xs.length; i++)
    acc = f(acc) (xs[i], i);

  return acc;
};

const Task = task => record(
  Task,
  thisify(o => {
    o.task = (res, rej) =>
      task(x => {
        o.task = k => k(x);
        return res(x);
      }, rej);
    
    return o;
  }));

const taskMap = f => tx =>
  Task((res, rej) =>
    tx.task(x => res(f(x)), rej));

const taskOf = x =>
  Task((res, rej) => res(x));

const taskAnd = tx => ty =>
  Task((res, rej) =>
    tx.task(x =>
      ty.task(y =>
        res([x, y]), rej), rej));

const taskAll =
  arrFold(tx => ty =>
    taskMap(([x, y]) =>
      xs => x => xs.concat([x]))
        (taskAnd(tx) (ty)))
          (taskOf([]));

const inc = x =>
  Task((res, rej) =>
    setTimeout(x => res(x + 1), 0, x));
    
const xs = Array(1e5).fill(inc(0));

const main = taskAll(xs);

main.task(console.log, console.error);

In order to solve this issue you usually break the function calls with a special data structure and a corresponding trampoline:

const Call = f => (...args) =>
  ({tag: "Call", f, args});

const deferredRec = step => {
  while (step && step.tag === "Call")
    step = step.f(...step.args);

  return step;
};

Now the decisive function in taskAll seems to be taskMap, where two operations wind up the stack:

const taskMap = f => tx =>
  Task((res, rej) =>
    tx.task(x => res(f(x)), rej));
//               ^^^^^^^^^
//  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^

const taskMap = f => tx =>
  Task((res, rej) =>
    Call(f => tx.task(f)) (x => Call(res) (f(x)), rej));

While the adjustment prevents the stack overflow, it unfortunately stops the computation from running to completion, that is to say the final continuation console.log is never called but the computation stops after calling inc once (see line A):

const deferredRec = step => {
  while (step && step.tag === "Call")
    step = step.f(...step.args);

  return step;
};

const Call = f => (...args) =>
  ({tag: "Call", f, args});

const record = (type, o) =>
  (o[type.name || type] = type.name || type, o);

const thisify = f => f({});

const arrFold = f => init => xs => {
  let acc = init;
  
  for (let i = 0; i < xs.length; i++)
    acc = f(acc) (xs[i], i);

  return acc;
};

const Task = task => record(
  Task,
  thisify(o => {
    o.task = (res, rej) =>
      task(x => {
        o.task = k => k(x);
        return res(x);
      }, rej);
    
    return o;
  }));

const taskMap = f => tx =>
  Task((res, rej) =>
    Call(f => tx.task(f)) (x => Call(res) (f(x)), rej));

const taskOf = x =>
  Task((res, rej) => res(x));

const taskAnd = tx => ty =>
  Task((res, rej) =>
    tx.task(x =>
      ty.task(y =>
        res([x, y]), rej), rej));

const taskAll =
  arrFold(tx => ty =>
    taskMap(([xs, x]) =>
      xs.concat([x]))
        (taskAnd(tx) (ty)))
          (taskOf([]));

const inc = x =>
  Task((res, rej) =>
    setTimeout(x => (console.log("inc"), res(x + 1)), 0, x)); // A
    
const xs = Array(3).fill(inc(0));

const main = taskAll(xs);

deferredRec(main.task(console.log, console.error));

How can this be done right? Is there a more general approach for all sorts of CPS code? Please note that I don't want to give up on lazyness.

1

There are 1 answers

0
AudioBubble On

I figured it out myself. inc (line A) just needs to apply the trampoline:

const deferredRec = step => {
  while (step && step.tag === "Call")
    step = step.f(...step.args);

  return step;
};

const Call = f => (...args) =>
  ({tag: "Call", f, args});

const record = (type, o) =>
  (o[type.name || type] = type.name || type, o);

const thisify = f => f({});

const arrFold = f => init => xs => {
  let acc = init;
  
  for (let i = 0; i < xs.length; i++)
    acc = f(acc) (xs[i], i);

  return acc;
};

const Task = task => record(
  Task,
  thisify(o => {
    o.task = (res, rej) =>
      task(x => {
        o.task = k => k(x);
        return res(x);
      }, rej);
    
    return o;
  }));

const taskMap = f => tx =>
  Task((res, rej) =>
    Call(f => tx.task(f)) (x => Call(res) (f(x)), rej));

const taskOf = x =>
  Task((res, rej) => res(x));

const taskAnd = tx => ty =>
  Task((res, rej) =>
    tx.task(x =>
      ty.task(y =>
        res([x, y]), rej), rej));

const taskAll =
  arrFold(tx => ty =>
    taskMap(([xs, x]) =>
      xs.concat([x]))
        (taskAnd(tx) (ty)))
          (taskOf([]));

const inc = x =>
  Task((res, rej) =>
    setTimeout(x => deferredRec(res(x + 1)), 0, x)); // A
    
const xs = Array(1e4).fill(inc(0));

const main = taskAll(xs);

deferredRec(main.task(console.log, console.error));