NaturalCycles/nodejs-lib

View on GitHub
src/stream/transform/transformSplit.ts

Summary

Maintainability
B
5 hrs
Test Coverage
F
6%
import { Transform } from 'node:stream'
import { TransformTyped } from '../stream.model'

// The code below is carefully adopted from: https://github.com/max-mapper/binary-split

/**
 * Transforms input Buffer/string stream into Buffer chunks (objectMode: true) split by newLine.
 *
 * Useful for reading NDJSON files from fs.
 *
 * Same as binarySplit, but optimized (hard-coded) to split on NEWLINE (aka `\n`).
 * (+5-10% _pipeline speedup measured, compared to generic `binarySplit` on variable length delimiter)
 */
export function transformSplitOnNewline(): TransformTyped<Buffer, Buffer> {
  let buffered: Buffer | undefined

  return new Transform({
    readableObjectMode: true,
    writableHighWaterMark: 64 * 1024,

    transform(buf: Buffer, enc, done) {
      let offset = 0
      let lastMatch = 0
      if (buffered) {
        buf = Buffer.concat([buffered, buf])
        offset = buffered.length
        buffered = undefined
      }

      while (true) {
        const idx = firstNewlineMatch(buf, offset)
        if (idx !== -1 && idx < buf.length) {
          if (lastMatch !== idx) {
            this.push(buf.slice(lastMatch, idx))
          }
          offset = idx + 1
          lastMatch = offset
        } else {
          buffered = buf.slice(lastMatch)
          break
        }
      }

      done()
    },

    flush(done) {
      if (buffered && buffered.length > 0) this.push(buffered)
      done()
    },
  })
}

/**
 * Input: stream (objectMode=false) of arbitrary string|Buffer chunks, like when read from fs
 * Output: stream (objectMode=true) or string|Buffer chunks split by `separator` (@default to `\n`)
 *
 * Please use slightly more optimized `transformSplitOnNewline` for NDJSON file parsing.
 * (+5-10% _pipeline speedup measured!)
 */
export function transformSplit(separator = '\n'): TransformTyped<Buffer, Buffer> {
  const matcher = Buffer.from(separator)
  let buffered: Buffer | undefined

  return new Transform({
    readableObjectMode: true,
    writableHighWaterMark: 64 * 1024,

    transform(buf: Buffer, enc, done) {
      let offset = 0
      let lastMatch = 0
      if (buffered) {
        buf = Buffer.concat([buffered, buf])
        offset = buffered.length
        buffered = undefined
      }

      while (true) {
        const idx = firstMatch(buf, offset - matcher.length + 1, matcher)
        if (idx !== -1 && idx < buf.length) {
          if (lastMatch !== idx) {
            this.push(buf.slice(lastMatch, idx))
          }
          offset = idx + matcher.length
          lastMatch = offset
        } else {
          buffered = buf.slice(lastMatch)
          break
        }
      }

      done()
    },

    flush(done) {
      if (buffered && buffered.length > 0) this.push(buffered)
      done()
    },
  })
}

// const NEWLINE = Buffer.from('\n')
// const NEWLINE_CODE = NEWLINE[0]! // it is `10`
const NEWLINE_CODE = 10

/**
 * Same as firstMatch, but optimized (hard-coded) to find NEWLINE (aka `\n`).
 */
function firstNewlineMatch(buf: Buffer, offset: number): number {
  const bufLength = buf.length
  if (offset >= bufLength) return -1
  for (let i = offset; i < bufLength; i++) {
    if (buf[i] === NEWLINE_CODE) {
      return i
    }
  }
  return -1 // this code is unreachable, because i is guaranteed to be found in the loop above
}

function firstMatch(buf: Buffer, offset: number, matcher: Buffer): number {
  if (offset >= buf.length) return -1
  let i
  for (i = offset; i < buf.length; i++) {
    if (buf[i] === matcher[0]) {
      if (matcher.length > 1) {
        let fullMatch = true
        let j = i
        for (let k = 0; j < i + matcher.length; j++, k++) {
          if (buf[j] !== matcher[k]) {
            fullMatch = false
            break
          }
        }
        if (fullMatch) return j - matcher.length
      } else {
        break
      }
    }
  }

  return i + matcher.length - 1
}