Problem
I am using @aws-sdk/client-s3 - 3.414.0, and its SelectObjectContentCommand returning an S3 Select result as AsyncIterable<SelectObjectContentEventStream>.
I am trying to then parse this with csv-parse which is requiring the input as a readable stream to be able to pipe but I am coming across this error when using Readable.from() to transform the Async Iterable into a readable stream.
An error is being produced in which it seems the AsyncIterable is not being accepted by Readable.from() but I am not sure why as it can take iterable: Iterable<any> | AsyncIterable<any> . Any ideas why it might be causing this? Or recommendation for getting it in the right shape to be parsed by csv-parse. Thanks.
Error Produced
{
"errorType": "TypeError",
"errorMessage": "The \"chunk\" argument must be of type string or an instance of Buffer or Uint8Array. Received an instance of Object",
"code": "ERR_INVALID_ARG_TYPE",
"stack": [
"TypeError [ERR_INVALID_ARG_TYPE]: The \"chunk\" argument must be of type string or an instance of Buffer or Uint8Array. Received an instance of Object",
" at new NodeError (node:internal/errors:387:5)",
" at _write (node:internal/streams/writable:314:13)",
" at _Parser.Writable.write (node:internal/streams/writable:336:10)",
" at Readable.ondata (node:internal/streams/readable:754:22)",
" at Readable.emit (node:events:513:28)",
" at Readable.emit (node:domain:489:12)",
" at addChunk (node:internal/streams/readable:315:12)",
" at readableAddChunk (node:internal/streams/readable:289:9)",
" at Readable.push (node:internal/streams/readable:228:10)",
" at next (node:internal/streams/from:98:31)"
]
}
Example Code
import {Readable} from "stream";
import {parse} from "csv-parse";
async function getCsvRows(query: string): Promise<CsvRow[]> {
const s3SelectResult: AsyncIterable<SelectObjectContentEventStream> = await executeS3SelectQuery(query);
const readableStream: Readable = Readable.from(s3SelectResult, {objectMode: true});
const records: CsvRow[] = [];
const parser = readableStream
.pipe(parse({
delimiter: ",",
relax_quotes: true
}));
for await (const record of parser) {
records.push(record);
}
return records;
}