A new instance of the readable stream is not made on following api-calls

20 views Asked by At

I have an async function creating a Request to an external stream; it needs to download items from the source and store them in my database. It works fine the first time when my web-server is initiated but following calls to the endpoint do not create a new readable stream - as if it knows it's been called previously. But shouldn't a function call create/invoke the code again with no context of times that function was called?

This below is my function that is triggered by calling my API-endpoint. It works perfectly the first time I call it. I expected it to create new instances of the request and stream the data again on following function calls but it did not.

const requestItems = async (req: Request, res: Response) => {
  const stream = request({ url: `${baseUrl}/${endpoint}?${query.toString()}` })
  const parser = JSONStream.parse('*', modifyItem)
  stream.pipe(parser)
  res.sendStatus(200)

  parser.on('data', async (ad) => {
    stream.pause()
    console.log("Inside")
    await writeToDatabase()
    return stream.resume()
  })

  stream.on('complete', async () => {
    await writeToDatabase()
    console.log('Stream completed')
  })

I am not using fetch to make a HTTP-request but importing request (which uses the through-package). My imports:

import request from 'request'
import { Request, Response } from 'express'
import JSONStream from 'JSONStream'

On first function call to requestItems it steps into parser.on('data') and console.logs "inside". On following function calls it does not steps into parser.on('data') which I expected it to do.

I understand that a stream can only be read once, but I thought that meant once per invocation. What do I need to change for the request to be called on each function call?

First time I call it, this is what the stream request outputs:

{
  _events: [Object: null prototype] { pipe: [Function (anonymous)] },
  _eventsCount: 1,
  _maxListeners: undefined,
  readable: true,
  writable: true,
  _qs: Querystring {
    request: [Circular *1],
    lib: {
      formats: [Object],
      parse: [Function (anonymous)],
      stringify: [Function (anonymous)]
    },
    useQuerystring: undefined,
    parseOptions: {},
    stringifyOptions: {}
  },
  _auth: Auth {
    request: [Circular *1],
    hasAuth: false,
    sentAuth: false,
    bearerToken: null,
    user: null,
    pass: null
  },
  _oauth: OAuth { request: [Circular *1], params: null },
  _multipart: Multipart {
    request: [Circular *1],
    boundary: 'ef9ad6e3-446a-4611-aa1b-ccfde61ac033',
    chunked: false,
    body: null
  },
  _redirect: Redirect {
    request: [Circular *1],
    followRedirect: true,
    followRedirects: true,
    followAllRedirects: false,
    followOriginalHttpMethod: false,
    allowRedirect: [Function (anonymous)],
    maxRedirects: 10,
    redirects: [],
    redirectsFollowed: 0,
    removeRefererHeader: false
  },
  _tunnel: Tunnel {
    request: [Circular *1],
    proxyHeaderWhiteList: [
      'accept',           'accept-charset',
      'accept-encoding',  'accept-language',
      'accept-ranges',    'cache-control',
      'content-encoding', 'content-language',
      'content-location', 'content-md5',
      'content-range',    'content-type',
      'connection',       'date',
      'expect',           'max-forwards',
      'pragma',           'referer',
      'te',               'user-agent',
      'via'
    ],
    proxyHeaderExclusiveList: []
  },
  headers: { host: 'x.api.y.com' },
  setHeader: [Function (anonymous)],
  hasHeader: [Function (anonymous)],
  getHeader: [Function (anonymous)],
  removeHeader: [Function (anonymous)],
  method: 'GET',
  localAddress: undefined,
  pool: {},
  dests: [],
  __isRequestRequest: true,
  uri: Url {
    protocol: 'https:',
    slashes: true,
    auth: null,
    host: 'x.api.y.com',
    port: 443,
    hostname: 'x.api.y.com',
    hash: null,
    search: '?date=2024-02-10T10%3A34%3A44',
    query: 'date=2024-02-10T10%3A34%3A44',
    pathname: '/stream',
    path: '/stream?date=2024-02-10T10%3A34%3A44'
  },
  proxy: null,
  tunnel: true,
  setHost: true,
  originalCookieHeader: undefined,
  _disableCookies: true,
  _jar: undefined,
  port: 443,
  host: 'x.api.y.com',
  path: '/stream?date=2024-02-10T10%3A34%3A44',
  httpModule: {
    Agent: [Function: Agent],
    globalAgent: Agent {
      _events: [Object: null prototype],
      _eventsCount: 2,
      _maxListeners: undefined,
      defaultPort: 443,
      protocol: 'https:',
      options: [Object: null prototype],
      requests: [Object: null prototype] {},
      sockets: [Object: null prototype] {},
      freeSockets: [Object: null prototype] {},
      keepAliveMsecs: 1000,
      keepAlive: true,
      maxSockets: Infinity,
      maxFreeSockets: 256,
      scheduling: 'lifo',
      maxTotalSockets: Infinity,
      totalSocketCount: 0,
      maxCachedSessions: 100,
      _sessionCache: [Object],
      [Symbol(shapeMode)]: false,
      [Symbol(kCapture)]: false
    },
    Server: [Function: Server],
    createServer: [Function: createServer],
    get: [Function: get],
    request: [Function: request]
  },
  agentClass: [Function: Agent],
  agent: Agent {
    _events: [Object: null prototype] {
      free: [Function (anonymous)],
      newListener: [Function: maybeEnableKeylog]
    },
    _eventsCount: 2,
    _maxListeners: undefined,
    defaultPort: 443,
    protocol: 'https:',
    options: [Object: null prototype] {
      keepAlive: true,
      scheduling: 'lifo',
      timeout: 5000,
      noDelay: true,
      path: null
    },
    requests: [Object: null prototype] {},
    sockets: [Object: null prototype] {},
    freeSockets: [Object: null prototype] {},
    keepAliveMsecs: 1000,
    keepAlive: true,
    maxSockets: Infinity,
    maxFreeSockets: 256,
    scheduling: 'lifo',
    maxTotalSockets: Infinity,
    totalSocketCount: 0,
    maxCachedSessions: 100,
    _sessionCache: { map: {}, list: [] },
    [Symbol(shapeMode)]: false,
    [Symbol(kCapture)]: false
  },
  [Symbol(shapeMode)]: false,
  [Symbol(kCapture)]: false
}

Next times I call it, this is what the stream request outputs:

{
  _events: [Object: null prototype] { pipe: [Function (anonymous)] },
  _eventsCount: 1,
  _maxListeners: undefined,
  readable: true,
  writable: true,
  _qs: Querystring {
    request: [Circular *1],
    lib: {
      formats: [Object],
      parse: [Function (anonymous)],
      stringify: [Function (anonymous)]
    },
    useQuerystring: undefined,
    parseOptions: {},
    stringifyOptions: {}
  },
  _auth: Auth {
    request: [Circular *1],
    hasAuth: false,
    sentAuth: false,
    bearerToken: null,
    user: null,
    pass: null
  },
  _oauth: OAuth { request: [Circular *1], params: null },
  _multipart: Multipart {
    request: [Circular *1],
    boundary: '02ace725-0e14-425d-8f21-f69e9835eef1',
    chunked: false,
    body: null
  },
  _redirect: Redirect {
    request: [Circular *1],
    followRedirect: true,
    followRedirects: true,
    followAllRedirects: false,
    followOriginalHttpMethod: false,
    allowRedirect: [Function (anonymous)],
    maxRedirects: 10,
    redirects: [],
    redirectsFollowed: 0,
    removeRefererHeader: false
  },
  _tunnel: Tunnel {
    request: [Circular *1],
    proxyHeaderWhiteList: [
      'accept',           'accept-charset',
      'accept-encoding',  'accept-language',
      'accept-ranges',    'cache-control',
      'content-encoding', 'content-language',
      'content-location', 'content-md5',
      'content-range',    'content-type',
      'connection',       'date',
      'expect',           'max-forwards',
      'pragma',           'referer',
      'te',               'user-agent',
      'via'
    ],
    proxyHeaderExclusiveList: []
  },
  headers: { host: 'x.api.y.com' },
  setHeader: [Function (anonymous)],
  hasHeader: [Function (anonymous)],
  getHeader: [Function (anonymous)],
  removeHeader: [Function (anonymous)],
  method: 'GET',
  localAddress: undefined,
  pool: {},
  dests: [],
  __isRequestRequest: true,
  uri: Url {
    protocol: 'https:',
    slashes: true,
    auth: null,
    host: 'x.api.y.com',
    port: 443,
    hostname: 'x.api.y.com',
    hash: null,
    search: '?date=2024-02-10T10%3A37%3A53',
    query: 'date=2024-02-10T10%3A37%3A53',
    pathname: '/stream',
    path: '/stream?date=2024-02-10T10%3A37%3A53',
    href: 'https://x.api.y.com/stream?date=2024-02-10T10%3A37%3A53'
  },
  proxy: null,
  tunnel: true,
  setHost: true,
  originalCookieHeader: undefined,
  _disableCookies: true,
  _jar: undefined,
  port: 443,
  host: 'x.api.y.com',
  path: '/stream?date=2024-02-10T10%3A37%3A53',
  httpModule: {
    Agent: [Function: Agent],
    globalAgent: Agent {
      _events: [Object: null prototype],
      _eventsCount: 2,
      _maxListeners: undefined,
      defaultPort: 443,
      protocol: 'https:',
      options: [Object: null prototype],
      requests: [Object: null prototype] {},
      sockets: [Object: null prototype] {},
      freeSockets: [Object: null prototype] {},
      keepAliveMsecs: 1000,
      keepAlive: true,
      maxSockets: Infinity,
      maxFreeSockets: 256,
      scheduling: 'lifo',
      maxTotalSockets: Infinity,
      totalSocketCount: 0,
      maxCachedSessions: 100,
      _sessionCache: [Object],
      [Symbol(shapeMode)]: false,
      [Symbol(kCapture)]: false
    },
    Server: [Function: Server],
    createServer: [Function: createServer],
    get: [Function: get],
    request: [Function: request]
  },
  agentClass: [Function: Agent],
  agent: Agent {
    _events: [Object: null prototype] {
      free: [Function (anonymous)],
      newListener: [Function: maybeEnableKeylog]
    },
    _eventsCount: 2,
    _maxListeners: undefined,
    defaultPort: 443,
    protocol: 'https:',
    options: [Object: null prototype] {
      keepAlive: true,
      scheduling: 'lifo',
      timeout: 5000,
      noDelay: true,
      path: null
    },
    requests: [Object: null prototype] {},
    sockets: [Object: null prototype] {},
    freeSockets: [Object: null prototype] {},
    keepAliveMsecs: 1000,
    keepAlive: true,
    maxSockets: Infinity,
    maxFreeSockets: 256,
    scheduling: 'lifo',
    maxTotalSockets: Infinity,
    totalSocketCount: 0,
    maxCachedSessions: 100,
    _sessionCache: { map: [Object], list: [Array] },
    [Symbol(shapeMode)]: false,
    [Symbol(kCapture)]: false
  },
  [Symbol(shapeMode)]: false,
  [Symbol(kCapture)]: false
}
0

There are 0 answers