NaturalCycles/nodejs-lib

View on GitHub
src/stream/transform/transformChunk.ts

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
import { Transform } from 'node:stream'
import { TransformOptions, TransformTyped } from '../stream.model'

export interface TransformChunkOptions extends TransformOptions {
  /**
   * How many items to include in each chunk.
   * Last chunk will contain the remaining items, possibly less than chunkSize.
   */
  chunkSize: number
}

/**
 * Similar to RxJS bufferCount(),
 * allows to "chunk" the input stream into chunks of `opt.chunkSize` size.
 * Last chunk will contain the remaining items, possibly less than chunkSize.
 */
export function transformChunk<IN = any>(opt: TransformChunkOptions): TransformTyped<IN, IN[]> {
  const { chunkSize } = opt

  let buf: IN[] = []

  return new Transform({
    objectMode: true,
    ...opt,
    transform(chunk, _, cb) {
      buf.push(chunk)

      if (buf.length >= chunkSize) {
        cb(null, buf)
        buf = []
      } else {
        cb()
      }
    },
    final(this: Transform, cb) {
      if (buf.length) {
        this.push(buf)
        buf = []
      }

      cb()
    },
  })
}