How do I get a NodeJS Transform stream to emit multiple records for each chunk of input?

1.9k views Asked by At

I am writing a Transform stream that takes an incoming stream of XML and emits a subset of that as JS objects.

<item>
  <name>some name</name>
  <files>
    <file path='path/to/file'/>
    <file path='path/to//other/file'/>
  </files>
</item>

needs to write two item objects

{
  name: 'some name',
  file: 'path/to/file
}

and

{
  name: 'some name',
  file: 'path/to/other/file
}

as it's then going to be piped into a CSV writer.

The code looks like

import fs from 'fs'
import expat from 'node-expat'
import { Transform } from 'stream'

const xParser = (filePath) => {
  const stream = fs.createReadStream(filePath)
  const parser = new expat.Parser('UTF-8')
  const tranny = Transform({ objectMode: true })

  tranny._transform = function transform(data, encoding, done) {
    this.push(data)
    done()
  }

  let current
  let inItem = false
  let tag
  let attributes

  const startTag = (name, attrs) => {
    if (name === 'item') {
      inItem = true
      current = {
        name: '',
        files: []
      }
      return
    }
    if (!inItem) return
    tag = name
    attributes = attrs
    if (tag === 'file') {
      current.files.push(attributes.path)
    }
  }

  const write = (file) => {
    tranny.write({
      name: current.name,
      file
    })
  }

  const endTag = (name) => {
    if (name === 'item') {
      // console.log('end item', JSON.stringify(current))
      inItem = false
      if (current.files.length === 0) {
        write('')
      } else {
        current.files.forEach(file => {
          write(file)
        })
      }
      tag = undefined
      attributes = undefined
    }
  }

  const handleText = (text) => {
    if (!inItem) return
    if (tag === 'name') {
      current.name = current.name.concat(text).trim()
    }
  }

  const handleDone = () => {
    console.log('done')
    tranny.end()
  }

  parser
  .on('startElement', startTag)
  .on('endElement', endTag)
  .on('text', handleText)
  .on('end', handleDone)

  stream.pipe(parser)
  return tranny
}

module.exports = xParser

when the xml is simply

<item>
  <name>some name</name>
  <files>
    <file path='path/to/file'/>
  </files>
</item>

this works fine but when it hits an item with multiple filed it just stops and the 'end' triggers as soon as the second tranny.write happens.

How do I get a Transform stream to emit multiple records for each chunk of input?

1

There are 1 answers

0
Dave Sag On

Okay fixed it.

This solution was to not do multiple tranny.writes but to do the data transformation within the tranny._transform function (as ought to have been obvious really).

so like

tranny._transform = function transform(data, encoding, done) {
  if (data.files.length > 0) {
    data.files.forEach((file) => {
      this.push({
        name: data.name,
        file
      })
    })
  } else {
    this.push({
      name: data.name,
      file: ''
    })
  }
  done()
}

and the endTag handler now looks like

const endTag = (name) => {
  if (name === 'item') {
    inItem = false
    tranny.write(current)
    tag = undefined
    attributes = undefined
  }
}

meaning I could also just delete the write function.

Now it works, and verified when I do

import xParser from './xParser'
import concat from 'concat-stream'

xmlParser('data/mybigdatafile.xml')
.pipe(concat((data) => {
  console.log(JSON.stringify(data));
}))

this spits out the full set of data for me.