NaturalCycles/nodejs-lib

View on GitHub
src/stream/ndjson/ndjsonMap.ts

Summary

Maintainability
A
0 mins
Test Coverage
F
42%
import { AbortableAsyncMapper, ErrorMode } from '@naturalcycles/js-lib'
import {
  transformLimit,
  transformLogProgress,
  transformMap,
  TransformMapOptions,
  _pipeline,
  TransformLogProgressOptions,
  fs2,
} from '../..'

export interface NDJSONMapOptions<IN = any, OUT = IN>
  extends TransformMapOptions<IN, OUT>,
    TransformLogProgressOptions<IN> {
  inputFilePath: string
  outputFilePath: string

  limitInput?: number
  limitOutput?: number

  /**
   * @default 100_000
   */
  logEveryOutput?: number

  /**
   * Defaults to `true` for ndjsonMap
   *
   * @default true
   */
  flattenArrayOutput?: boolean
}

/**
 * Unzips input file automatically, if it ends with `.gz`.
 * Zips output file automatically, if it ends with `.gz`.
 */
export async function ndjsonMap<IN = any, OUT = any>(
  mapper: AbortableAsyncMapper<IN, OUT>,
  opt: NDJSONMapOptions<IN, OUT>,
): Promise<void> {
  const { inputFilePath, outputFilePath, logEveryOutput = 100_000, limitInput, limitOutput } = opt

  console.log({
    inputFilePath,
    outputFilePath,
  })

  const readable = fs2
    .createReadStreamAsNDJSON(inputFilePath)
    .take(limitInput || Number.POSITIVE_INFINITY)

  await _pipeline([
    readable,
    transformLogProgress({ metric: 'read', ...opt }),
    transformMap(mapper, {
      flattenArrayOutput: true,
      errorMode: ErrorMode.SUPPRESS,
      ...opt,
    }),
    transformLimit({ limit: limitOutput, sourceReadable: readable }),
    transformLogProgress({ metric: 'saved', logEvery: logEveryOutput }),
    ...fs2.createWriteStreamAsNDJSON(outputFilePath),
  ])
}