NaturalCycles/nodejs-lib

View on GitHub
src/stream/transform/transformMapSync.ts

Summary

Maintainability
C
1 day
Test Coverage
C
76%
import {
  _anyToError,
  CommonLogger,
  END,
  ErrorMode,
  Mapper,
  Predicate,
  SKIP,
} from '@naturalcycles/js-lib'
import { yellow } from '../../colors/colors'
import { AbortableTransform } from '../pipeline/pipeline'
import { TransformTyped } from '../stream.model'
import { pipelineClose } from '../stream.util'
import { TransformMapStats } from './transformMap'

export interface TransformMapSyncOptions<IN = any, OUT = IN> {
  /**
   * @default true
   */
  objectMode?: boolean

  /**
   * @default false
   * Set true to support "multiMap" - possibility to return [] and emit 1 result for each item in the array.
   */
  flattenArrayOutput?: boolean

  /**
   * Predicate to filter outgoing results (after mapper).
   * Allows to not emit all results.
   *
   * Defaults to "pass everything".
   * Simpler way to skip individual entries is to return SKIP symbol.
   */
  predicate?: Predicate<OUT>

  /**
   * @default THROW_IMMEDIATELY
   */
  errorMode?: ErrorMode

  /**
   * If defined - will be called on every error happening in the stream.
   * Called BEFORE observable will emit error (unless skipErrors is set to true).
   */
  onError?: (err: Error, input: IN) => any

  /**
   * A hook that is called when the last item is finished processing.
   * stats object is passed, containing countIn and countOut -
   * number of items that entered the transform and number of items that left it.
   *
   * Callback is called **before** [possible] Aggregated error is thrown,
   * and before [possible] THROW_IMMEDIATELY error.
   *
   * onDone callback will be called before Error is thrown.
   */
  onDone?: (stats: TransformMapStats) => any

  /**
   * Progress metric
   *
   * @default `stream`
   */
  metric?: string

  logger?: CommonLogger
}

export class TransformMapSync extends AbortableTransform {}

/**
 * Sync (not async) version of transformMap.
 * Supposedly faster, for cases when async is not needed.
 */
export function transformMapSync<IN = any, OUT = IN>(
  mapper: Mapper<IN, OUT | typeof SKIP | typeof END>,
  opt: TransformMapSyncOptions = {},
): TransformTyped<IN, OUT> {
  const {
    predicate, // defaults to "no predicate" (pass everything)
    errorMode = ErrorMode.THROW_IMMEDIATELY,
    flattenArrayOutput = false,
    onError,
    onDone,
    metric = 'stream',
    objectMode = true,
    logger = console,
  } = opt

  const started = Date.now()
  let index = -1
  let countOut = 0
  let isSettled = false
  let errors = 0
  const collectedErrors: Error[] = [] // only used if errorMode == THROW_AGGREGATED

  return new TransformMapSync({
    objectMode,
    ...opt,
    transform(this: AbortableTransform, chunk: IN, _, cb) {
      // Stop processing if isSettled
      if (isSettled) return cb()

      const currentIndex = ++index

      try {
        // map and pass through
        const v = mapper(chunk, currentIndex)

        const passedResults = (flattenArrayOutput && Array.isArray(v) ? v : [v]).filter(r => {
          if (r === END) {
            isSettled = true // will be checked later
            return false
          }
          return r !== SKIP && (!predicate || predicate(r, currentIndex))
        })

        countOut += passedResults.length
        passedResults.forEach(r => this.push(r))

        if (isSettled) {
          logger.log(`transformMapSync END received at index ${currentIndex}`)
          pipelineClose('transformMapSync', this, this.sourceReadable, this.streamDone, logger)
        }

        cb() // done processing
      } catch (err) {
        logger.error(err)
        errors++

        logErrorStats()

        if (onError) {
          try {
            onError(_anyToError(err), chunk)
          } catch {}
        }

        if (errorMode === ErrorMode.THROW_IMMEDIATELY) {
          isSettled = true

          try {
            onDone?.({
              ok: false,
              collectedErrors,
              countErrors: errors,
              countIn: index + 1,
              countOut,
              started,
            })
          } catch (err) {
            logger.error(err)
          }

          // Emit error immediately
          return cb(err as Error)
        }

        if (errorMode === ErrorMode.THROW_AGGREGATED) {
          collectedErrors.push(err as Error)
        }

        cb()
      }
    },
    final(cb) {
      // console.log('transformMap final')

      logErrorStats(true)

      if (collectedErrors.length) {
        try {
          onDone?.({
            ok: false,
            collectedErrors,
            countErrors: errors,
            countIn: index + 1,
            countOut,
            started,
          })
        } catch (err) {
          logger.error(err)
        }

        // emit Aggregated error
        cb(
          new AggregateError(
            collectedErrors,
            `transformMapSync resulted in ${collectedErrors.length} error(s)`,
          ),
        )
      } else {
        // emit no error

        try {
          onDone?.({
            ok: true,
            collectedErrors,
            countErrors: errors,
            countIn: index + 1,
            countOut,
            started,
          })
        } catch (err) {
          logger.error(err)
        }

        cb()
      }
    },
  })

  function logErrorStats(final = false): void {
    if (!errors) return

    logger.log(`${metric} ${final ? 'final ' : ''}errors: ${yellow(errors)}`)
  }
}