Using Web Streams, create a TransformStream from several TransformStreams

865 views Asked by At

Is it possible to create a single TransformStream out of several other TransformStreams using whatwg streams (the web Streams API)?

For example, if I have two TransformStreams which run in sequence, like transformer1 and transformer2:

readable.pipeThrough(transformer1).pipeThrough(transformer2).pipeTo(writable)

Ultimately, I'd like to be able to convert that to

readable.pipeThrough(allTransformers).pipeTo(writable)

Where allTransformers is the TransformStream combining transformer1 and transformer2.

Below is not real functional code, but I'd think there would be a way to do something like this:

const allTransformers = transformer1.pipeThrough(transformer2)

This is clearly a simplified example, but you can imagine there being many transform streams and I'd like to refactor to a single, reusable transform pipeline.

1

There are 1 answers

4
check_ca On BEST ANSWER

I had the same issue, here are my solutions.

In the example below, UpperCaseTransformStream pipes the stream through TextDecoderStream, UpperCaseTextStream and TextEncoderStream.

class UpperCaseTextStream extends TransformStream {
  constructor() {
    super({
      transform(chunk, controller) {
        controller.enqueue(chunk.toUpperCase());
      }
    });
  }
}

class UpperCaseTransformStream {
  constructor(...strategies) {
    const { writable, readable } = new TransformStream({}, ...strategies);
    this.writable = writable;
    this.readable = readable
      .pipeThrough(new TextDecoderStream())
      .pipeThrough(new UpperCaseTextStream())
      .pipeThrough(new TextEncoderStream());
  }
}

const TEXT_CHUNK =
 "Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy " +
 "nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.";
const TEXT_CONTENT = new Array(1024 * 1024).fill(TEXT_CHUNK).join("");
const readableInput = new Blob([TEXT_CONTENT]).stream();
const readableOuput = readableInput.pipeThrough(new UpperCaseTransformStream());  
(async () => {
  const text = await new Response(readableOuput).text();
  console.log("Test OK?", text == TEXT_CONTENT.toUpperCase());
})().catch(console.error);

Here is a more generic implementation using the class PipelineStream which extends TransformStream and accepts an array of TransformStream instances as first parameter.

class UpperCaseTextStream extends TransformStream {
  constructor() {
    super({
      transform(chunk, controller) {
        controller.enqueue(chunk.toUpperCase());
      }
    });
  }
}

class PipelineStream extends TransformStream {
  constructor(transformStreams, ...strategies) {
    super({}, ...strategies);

    const readable = [super.readable, ...transformStreams]
      .reduce((readable, transform) => readable.pipeThrough(transform));

    Object.defineProperty(this, "readable", {
      get() {
        return readable;
      }
    });
  }
}

const TEXT_CHUNK =
 "Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy " +
 "nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.";
const TEXT_CONTENT = new Array(1024 * 1024).fill(TEXT_CHUNK).join("");
const readableInput = new Blob([TEXT_CONTENT]).stream();
const readableOuput = readableInput.pipeThrough(new PipelineStream([
  new TextDecoderStream(),
  new UpperCaseTextStream(),
  new TextEncoderStream()]));
(async () => {
  const text = await new Response(readableOuput).text();
  console.log("Test OK?", text == TEXT_CONTENT.toUpperCase());
})().catch(console.error);