How to create a NodeJS Readable Stream from Async Iterable (AWS S3 SelectObjectContentEventStream)?

440 views Asked by At

Problem

I am using @aws-sdk/client-s3 - 3.414.0, and its SelectObjectContentCommand returning an S3 Select result as AsyncIterable<SelectObjectContentEventStream>.

I am trying to then parse this with csv-parse which is requiring the input as a readable stream to be able to pipe but I am coming across this error when using Readable.from() to transform the Async Iterable into a readable stream.

An error is being produced in which it seems the AsyncIterable is not being accepted by Readable.from() but I am not sure why as it can take iterable: Iterable<any> | AsyncIterable<any> . Any ideas why it might be causing this? Or recommendation for getting it in the right shape to be parsed by csv-parse. Thanks.

Error Produced

{
    "errorType": "TypeError",
    "errorMessage": "The \"chunk\" argument must be of type string or an instance of Buffer or Uint8Array. Received an instance of Object",
    "code": "ERR_INVALID_ARG_TYPE",
    "stack": [
        "TypeError [ERR_INVALID_ARG_TYPE]: The \"chunk\" argument must be of type string or an instance of Buffer or Uint8Array. Received an instance of Object",
        "    at new NodeError (node:internal/errors:387:5)",
        "    at _write (node:internal/streams/writable:314:13)",
        "    at _Parser.Writable.write (node:internal/streams/writable:336:10)",
        "    at Readable.ondata (node:internal/streams/readable:754:22)",
        "    at Readable.emit (node:events:513:28)",
        "    at Readable.emit (node:domain:489:12)",
        "    at addChunk (node:internal/streams/readable:315:12)",
        "    at readableAddChunk (node:internal/streams/readable:289:9)",
        "    at Readable.push (node:internal/streams/readable:228:10)",
        "    at next (node:internal/streams/from:98:31)"
    ]
}

Example Code

import {Readable} from "stream";
import {parse} from "csv-parse";

async function getCsvRows(query: string): Promise<CsvRow[]> {

  const s3SelectResult: AsyncIterable<SelectObjectContentEventStream> = await executeS3SelectQuery(query);

  const readableStream: Readable = Readable.from(s3SelectResult, {objectMode: true});

  const records: CsvRow[] = [];

  const parser = readableStream
     .pipe(parse({
       delimiter: ",",
       relax_quotes: true
     }));
  
  for await (const record of parser) {
    records.push(record);
  }

  return records;

}
0

There are 0 answers