NaturalCycles/nodejs-lib

View on GitHub
src/stream/transform/transformMap.ts

Summary

Maintainability
C
1 day
Test Coverage
B
84%
import {
  _anyToError,
  _hc,
  _since,
  _stringify,
  AbortableAsyncMapper,
  AsyncPredicate,
  CommonLogger,
  END,
  ErrorMode,
  pFilter,
  Promisable,
  SKIP,
  StringMap,
  UnixTimestampMillisNumber,
} from '@naturalcycles/js-lib'
import through2Concurrent = require('through2-concurrent')
import { yellow } from '../../colors/colors'
import { AbortableTransform } from '../pipeline/pipeline'
import { TransformTyped } from '../stream.model'
import { pipelineClose } from '../stream.util'

export interface TransformMapOptions<IN = any, OUT = IN> {
  /**
   * Set true to support "multiMap" - possibility to return [] and emit 1 result for each item in the array.
   *
   * @default false
   */
  flattenArrayOutput?: boolean

  /**
   * Predicate to filter outgoing results (after mapper).
   * Allows to not emit all results.
   *
   * Defaults to "pass everything" (including null, undefined, etc).
   * Simpler way to exclude certain cases is to return SKIP symbol from the mapper.
   */
  predicate?: AsyncPredicate<OUT>

  /**
   * Number of concurrently pending promises returned by `mapper`.
   *
   * Default is 32.
   * It was recently changed up from 16, after some testing that shown that
   * for simple low-cpu mapper functions 32 produces almost 2x throughput.
   * For example, in scenarios like streaming a query from Datastore.
   */
  concurrency?: number

  /**
   * @default THROW_IMMEDIATELY
   */
  errorMode?: ErrorMode

  /**
   * If defined - will be called on every error happening in the stream.
   * Called BEFORE observable will emit error (unless skipErrors is set to true).
   */
  onError?: (err: Error, input: IN) => any

  /**
   * A hook that is called when the last item is finished processing.
   * stats object is passed, containing countIn and countOut -
   * number of items that entered the transform and number of items that left it.
   *
   * Callback is called **before** [possible] Aggregated error is thrown,
   * and before [possible] THROW_IMMEDIATELY error.
   *
   * onDone callback will be awaited before Error is thrown.
   */
  onDone?: (stats: TransformMapStats) => Promisable<any>

  /**
   * Progress metric
   *
   * @default `stream`
   */
  metric?: string

  logger?: CommonLogger
}

export interface TransformMapStats {
  /**
   * True if transform was successful (didn't throw Immediate or Aggregated error).
   */
  ok: boolean
  /**
   * Only used (and returned) for ErrorMode.Aggregated
   */
  collectedErrors: Error[]
  countErrors: number
  countIn: number
  countOut: number
  started: UnixTimestampMillisNumber
}

export interface TransformMapStatsSummary extends TransformMapStats {
  /**
   * Name of the summary, defaults to `Transform`
   */
  name?: string

  /**
   * Allows to pass extra key-value object, which will be rendered as:
   * key: value
   * key2: value2
   */
  extra?: StringMap<any>
}

// doesn't work, cause here we don't construct our Transform instance ourselves
// export class TransformMap extends AbortableTransform {}

/**
 * Like pMap, but for streams.
 * Inspired by `through2`.
 * Main feature is concurrency control (implemented via `through2-concurrent`) and convenient options.
 * Using this allows native stream .pipe() to work and use backpressure.
 *
 * Only works in objectMode (due to through2Concurrent).
 *
 * Concurrency defaults to 32.
 *
 * If an Array is returned by `mapper` - it will be flattened and multiple results will be emitted from it. Tested by Array.isArray().
 */
export function transformMap<IN = any, OUT = IN>(
  mapper: AbortableAsyncMapper<IN, OUT | typeof SKIP | typeof END>,
  opt: TransformMapOptions<IN, OUT> = {},
): TransformTyped<IN, OUT> {
  const {
    concurrency = 32,
    predicate, // we now default to "no predicate" (meaning pass-everything)
    errorMode = ErrorMode.THROW_IMMEDIATELY,
    flattenArrayOutput,
    onError,
    onDone,
    metric = 'stream',
    logger = console,
  } = opt

  const started = Date.now()
  let index = -1
  let countOut = 0
  let isSettled = false
  let errors = 0
  const collectedErrors: Error[] = [] // only used if errorMode == THROW_AGGREGATED

  return through2Concurrent.obj(
    {
      maxConcurrency: concurrency,
      async final(cb) {
        // console.log('transformMap final')

        logErrorStats(true)

        if (collectedErrors.length) {
          try {
            await onDone?.({
              ok: false,
              collectedErrors,
              countErrors: errors,
              countIn: index + 1,
              countOut,
              started,
            })
          } catch (err) {
            logger.error(err)
          }

          // emit Aggregated error
          cb(
            new AggregateError(
              collectedErrors,
              `transformMap resulted in ${collectedErrors.length} error(s)`,
            ),
          )
        } else {
          // emit no error

          try {
            await onDone?.({
              ok: true,
              collectedErrors,
              countErrors: errors,
              countIn: index + 1,
              countOut,
              started,
            })
          } catch (err) {
            logger.error(err)
          }

          cb()
        }
      },
    },
    async function transformMapFn(this: AbortableTransform, chunk: IN, _, cb) {
      // Stop processing if isSettled (either THROW_IMMEDIATELY was fired or END received)
      if (isSettled) return cb()

      const currentIndex = ++index

      try {
        const res = await mapper(chunk, currentIndex)
        const passedResults = await pFilter(
          flattenArrayOutput && Array.isArray(res) ? res : [res],
          async r => {
            if (r === END) {
              isSettled = true // will be checked later
              return false
            }
            return r !== SKIP && (!predicate || (await predicate(r, currentIndex)))
          },
        )

        countOut += passedResults.length
        passedResults.forEach(r => this.push(r))

        if (isSettled) {
          logger.log(`transformMap END received at index ${currentIndex}`)
          pipelineClose('transformMap', this, this.sourceReadable, this.streamDone, logger)
        }

        cb() // done processing
      } catch (err) {
        logger.error(err)
        errors++
        logErrorStats()

        if (onError) {
          try {
            onError(_anyToError(err), chunk)
          } catch {}
        }

        if (errorMode === ErrorMode.THROW_IMMEDIATELY) {
          isSettled = true

          try {
            await onDone?.({
              ok: false,
              collectedErrors,
              countErrors: errors,
              countIn: index + 1,
              countOut,
              started,
            })
          } catch (err) {
            logger.error(err)
          }

          return cb(err) // Emit error immediately
        }

        if (errorMode === ErrorMode.THROW_AGGREGATED) {
          collectedErrors.push(err as Error)
        }

        // Tell input stream that we're done processing, but emit nothing to output - not error nor result
        cb()
      }
    },
  )

  function logErrorStats(final = false): void {
    if (!errors) return
    logger.log(`${metric} ${final ? 'final ' : ''}errors: ${yellow(errors)}`)
  }
}

/**
 * Renders TransformMapStatsSummary into a friendly string,
 * to be used e.g in Github Actions summary or Slack.
 */
export function transformMapStatsSummary(summary: TransformMapStatsSummary): string {
  const { countIn, countOut, countErrors, started, name = 'Transform', extra = {} } = summary

  return [
    `### ${name} summary\n`,
    `${_since(started)} spent`,
    `${_hc(countIn)} / ${_hc(countOut)} row(s) in / out`,
    countErrors ? `${countErrors} error(s)` : '',
    ...Object.entries(extra).map(([k, v]) => `${k}: ${_stringify(v)}`),
  ]
    .filter(Boolean)
    .join('\n')
}