NaturalCycles/nodejs-lib

View on GitHub
src/stream/transform/transformLimit.ts

Summary

Maintainability
A
35 mins
Test Coverage
A
95%
import { Readable } from 'node:stream'
import { CommonLogger } from '@naturalcycles/js-lib'
import { AbortableTransform, transformNoOp } from '../../index'
import { TransformOptions, TransformTyped } from '../stream.model'
import { pipelineClose } from '../stream.util'

export interface TransformLimitOptions extends TransformOptions {
  /**
   * Nullish value (e.g 0 or undefined) would mean "no limit"
   */
  limit?: number

  /**
   * If provided (recommended!) - it will call readable.destroy() on limit.
   * Without it - it will only stop the downstream consumers, but won't stop
   * the Readable ("source" of the stream).
   * It is almost always crucial to stop the Source too, so, please provide the Readable here!
   */
  sourceReadable?: Readable

  /**
   * Please provide it (a Promise that resolves when the Stream is done, e.g finished consuming things)
   * to be able to wait for Consumers before calling `readable.destroy`.
   * Has no effect if `readable` is not provided.
   */
  streamDone?: Promise<void>

  logger?: CommonLogger

  /**
   * Set to true to enable additional debug messages, e.g it'll log
   * when readable still emits values after the limit is reached.
   */
  debug?: boolean
}

/**
 * Class only exists to be able to do `instanceof TransformLimit`
 * and to set sourceReadable+streamDone to it in `_pipeline`.
 */
export class TransformLimit extends AbortableTransform {}

export function transformLimit<IN>(opt: TransformLimitOptions): TransformTyped<IN, IN> {
  const { logger = console, limit, debug } = opt

  if (!limit) {
    // No limit - returning pass-through transform
    return transformNoOp()
  }

  let i = 0 // so we start first chunk with 1
  let ended = false
  return new TransformLimit({
    objectMode: true,
    ...opt,
    transform(this: TransformLimit, chunk, _, cb) {
      i++

      if (i === limit) {
        ended = true
        logger.log(`transformLimit of ${limit} reached`)
        this.push(chunk)

        pipelineClose(
          'transformLimit',
          this,
          opt.sourceReadable || this.sourceReadable,
          opt.streamDone || this.streamDone,
          logger,
        )

        cb() // after pause
      } else if (!ended) {
        cb(null, chunk)
      } else {
        if (debug) logger.log(`transformLimit.transform after limit`, i)
        // If we ever HANG (don't call cb) - Node will do process.exit(0) to us
        cb() // ended, don't emit anything
      }
    },
  })
}