NaturalCycles/nodejs-lib

View on GitHub
src/fs/fs2.ts

Summary

Maintainability
A
0 mins
Test Coverage
F
42%
/*

Why?

Convenience re-export/re-implementation of most common fs functions from
node:fs, node:fs/promises and fs-extra

Defaults to string input/output, as it's used in 80% of time. For the rest - you can use native fs.

Allows to import it easier, so you don't have to choose between the 3 import locations.
That's why function names are slightly renamed, to avoid conflict.

Credit to: fs-extra (https://github.com/jprichardson/node-fs-extra)

 */

import type { RmOptions } from 'node:fs'
import fs from 'node:fs'
import fsp from 'node:fs/promises'
import path from 'node:path'
import { createGzip, createUnzip } from 'node:zlib'
import { _isTruthy, _jsonParse } from '@naturalcycles/js-lib'
import yaml, { DumpOptions } from 'js-yaml'
import { transformToNDJson } from '../stream/ndjson/transformToNDJson'
import { ReadableTyped, TransformTyped } from '../stream/stream.model'
import { transformSplitOnNewline } from '../stream/transform/transformSplit'
import { requireFileToExist } from '../util/env.util'

/**
 * fs2 conveniently groups filesystem functions together.
 * Supposed to be almost a drop-in replacement for these things together:
 *
 * 1. node:fs
 * 2. node:fs/promises
 * 3. fs-extra
 */
class FS2 {
  // Naming convention is:
  // - async function has Async in the name, e.g readTextAsync
  // - sync function has postfix in the name, e.g readText

  /**
   * Convenience wrapper that defaults to utf-8 string output.
   */
  readText(filePath: string): string {
    return fs.readFileSync(filePath, 'utf8')
  }

  /**
   * Convenience wrapper that defaults to utf-8 string output.
   */
  async readTextAsync(filePath: string): Promise<string> {
    return await fsp.readFile(filePath, 'utf8')
  }

  readBuffer(filePath: string): Buffer {
    return fs.readFileSync(filePath)
  }

  async readBufferAsync(filePath: string): Promise<Buffer> {
    return await fsp.readFile(filePath)
  }

  readJson<T = unknown>(filePath: string): T {
    const str = fs.readFileSync(filePath, 'utf8')
    return _jsonParse(str)
  }

  async readJsonAsync<T = unknown>(filePath: string): Promise<T> {
    const str = await fsp.readFile(filePath, 'utf8')
    return _jsonParse(str)
  }

  readYaml<T = unknown>(filePath: string): T {
    return yaml.load(fs.readFileSync(filePath, 'utf8')) as T
  }

  async readYamlAsync<T = unknown>(filePath: string): Promise<T> {
    return yaml.load(await fsp.readFile(filePath, 'utf8')) as T
  }

  writeFile(filePath: string, data: string | Buffer): void {
    fs.writeFileSync(filePath, data)
  }

  async writeFileAsync(filePath: string, data: string | Buffer): Promise<void> {
    await fsp.writeFile(filePath, data)
  }

  writeJson(filePath: string, data: any, opt?: JsonOptions): void {
    const str = stringify(data, opt)
    fs.writeFileSync(filePath, str)
  }

  async writeJsonAsync(filePath: string, data: any, opt?: JsonOptions): Promise<void> {
    const str = stringify(data, opt)
    await fsp.writeFile(filePath, str)
  }

  writeYaml(filePath: string, data: any, opt?: DumpOptions): void {
    const str = yaml.dump(data, opt)
    fs.writeFileSync(filePath, str)
  }

  async writeYamlAsync(filePath: string, data: any, opt?: DumpOptions): Promise<void> {
    const str = yaml.dump(data, opt)
    await fsp.writeFile(filePath, str)
  }

  appendFile(filePath: string, data: string | Buffer): void {
    fs.appendFileSync(filePath, data)
  }

  async appendFileAsync(filePath: string, data: string | Buffer): Promise<void> {
    await fsp.appendFile(filePath, data)
  }

  outputJson(filePath: string, data: any, opt?: JsonOptions): void {
    const str = stringify(data, opt)
    this.outputFile(filePath, str)
  }

  async outputJsonAsync(filePath: string, data: any, opt?: JsonOptions): Promise<void> {
    const str = stringify(data, opt)
    await this.outputFileAsync(filePath, str)
  }

  outputYaml(filePath: string, data: any, opt?: DumpOptions): void {
    const str = yaml.dump(data, opt)
    this.outputFile(filePath, str)
  }

  async outputYamlAsync(filePath: string, data: any, opt?: DumpOptions): Promise<void> {
    const str = yaml.dump(data, opt)
    await this.outputFileAsync(filePath, str)
  }

  outputFile(filePath: string, data: string | Buffer): void {
    const dirPath = path.dirname(filePath)
    if (!fs.existsSync(dirPath)) {
      this.ensureDir(dirPath)
    }

    fs.writeFileSync(filePath, data)
  }

  async outputFileAsync(filePath: string, data: string | Buffer): Promise<void> {
    const dirPath = path.dirname(filePath)
    if (!(await this.pathExistsAsync(dirPath))) {
      await this.ensureDirAsync(dirPath)
    }

    await fsp.writeFile(filePath, data)
  }

  pathExists(filePath: string): boolean {
    return fs.existsSync(filePath)
  }

  async pathExistsAsync(filePath: string): Promise<boolean> {
    try {
      await fsp.access(filePath)
      return true
    } catch {
      return false
    }
  }

  ensureDir(dirPath: string): void {
    fs.mkdirSync(dirPath, {
      mode: 0o777,
      recursive: true,
    })
  }

  async ensureDirAsync(dirPath: string): Promise<void> {
    await fsp.mkdir(dirPath, {
      mode: 0o777,
      recursive: true,
    })
  }

  ensureFile(filePath: string): void {
    let stats
    try {
      stats = fs.statSync(filePath)
    } catch {}
    if (stats?.isFile()) return

    const dir = path.dirname(filePath)
    try {
      if (!fs.statSync(dir).isDirectory()) {
        // parent is not a directory
        // This is just to cause an internal ENOTDIR error to be thrown
        fs.readdirSync(dir)
      }
    } catch (err) {
      // If the stat call above failed because the directory doesn't exist, create it
      if ((err as any)?.code === 'ENOENT') return this.ensureDir(dir)
      throw err
    }

    fs.writeFileSync(filePath, '')
  }

  async ensureFileAsync(filePath: string): Promise<void> {
    let stats
    try {
      stats = await fsp.stat(filePath)
    } catch {}
    if (stats?.isFile()) return

    const dir = path.dirname(filePath)
    try {
      if (!(await fsp.stat(dir)).isDirectory()) {
        // parent is not a directory
        // This is just to cause an internal ENOTDIR error to be thrown
        await fsp.readdir(dir)
      }
    } catch (err) {
      // If the stat call above failed because the directory doesn't exist, create it
      if ((err as any)?.code === 'ENOENT') return await this.ensureDirAsync(dir)
      throw err
    }

    await fsp.writeFile(filePath, '')
  }

  removePath(fileOrDirPath: string, opt?: RmOptions): void {
    fs.rmSync(fileOrDirPath, { recursive: true, force: true, ...opt })
  }

  async removePathAsync(fileOrDirPath: string, opt?: RmOptions): Promise<void> {
    await fsp.rm(fileOrDirPath, { recursive: true, force: true, ...opt })
  }

  emptyDir(dirPath: string): void {
    let items
    try {
      items = fs.readdirSync(dirPath)
    } catch {
      return this.ensureDir(dirPath)
    }

    items.forEach(item => this.removePath(path.join(dirPath, item)))
  }

  async emptyDirAsync(dirPath: string): Promise<void> {
    let items
    try {
      items = await fsp.readdir(dirPath)
    } catch {
      return await this.ensureDirAsync(dirPath)
    }

    await Promise.all(items.map(item => this.removePathAsync(path.join(dirPath, item))))
  }

  /**
   * Cautious, underlying Node function is currently Experimental.
   */
  copyPath(src: string, dest: string, opt?: fs.CopySyncOptions): void {
    fs.cpSync(src, dest, {
      recursive: true,
      ...opt,
    })
  }

  /**
   * Cautious, underlying Node function is currently Experimental.
   */
  async copyPathAsync(src: string, dest: string, opt?: fs.CopyOptions): Promise<void> {
    await fsp.cp(src, dest, {
      recursive: true,
      ...opt,
    })
  }

  renamePath(src: string, dest: string): void {
    fs.renameSync(src, dest)
  }

  async renamePathAsync(src: string, dest: string): Promise<void> {
    await fsp.rename(src, dest)
  }

  movePath(src: string, dest: string, opt?: fs.CopySyncOptions): void {
    this.copyPath(src, dest, opt)
    this.removePath(src)
  }

  async movePathAsync(src: string, dest: string, opt?: fs.CopyOptions): Promise<void> {
    await this.copyPathAsync(src, dest, opt)
    await this.removePathAsync(src)
  }

  // Re-export the whole fs/fsp, for the edge cases where they are needed
  fs = fs
  fsp = fsp

  // Re-export existing fs/fsp functions
  // rm/rmAsync are replaced with removePath/removePathAsync
  lstat = fs.lstatSync
  lstatAsync = fsp.lstat
  stat = fs.statSync
  statAsync = fsp.stat
  mkdir = fs.mkdirSync
  mkdirAsync = fsp.mkdir
  readdir = fs.readdirSync
  readdirAsync = fsp.readdir
  createWriteStream = fs.createWriteStream
  createReadStream = fs.createReadStream

  /*
  Returns a Readable of [already parsed] NDJSON objects.

  Replaces a list of operations:
  - requireFileToExist(inputPath)
  - fs.createReadStream
  - createUnzip (only if path ends with '.gz')
  - transformSplitOnNewline
  - transformJsonParse

  To add a Limit or Offset: just add .take() or .drop(), example:

  _pipeline([
    fs2.createReadStreamAsNDJSON().take(100),
    transformX(),
  ])
   */
  createReadStreamAsNDJSON<ROW = any>(inputPath: string): ReadableTyped<ROW> {
    requireFileToExist(inputPath)

    let stream: ReadableTyped<ROW> = fs
      .createReadStream(inputPath, {
        highWaterMark: 64 * 1024, // no observed speedup
      })
      .on('error', err => stream.emit('error', err))

    if (inputPath.endsWith('.gz')) {
      stream = stream.pipe(
        createUnzip({
          chunkSize: 64 * 1024, // speedup from ~3200 to 3800 rps!
        }),
      )
    }

    return stream.pipe(transformSplitOnNewline()).map(line => JSON.parse(line))
    // For some crazy reason .map is much faster than transformJsonParse!
    // ~5000 vs ~4000 rps !!!
    // .on('error', err => stream.emit('error', err))
    // .pipe(transformJsonParse<ROW>())
  }

  /*
  Returns an array of Transforms, so that you can ...destructure them at
  the end of the _pipeline.

  Replaces a list of operations:
  - transformToNDJson
  - createGzip (only if path ends with '.gz')
  - fs.createWriteStream
   */
  createWriteStreamAsNDJSON(outputPath: string): TransformTyped<any, any>[] {
    this.ensureFile(outputPath)

    return [
      transformToNDJson(),
      outputPath.endsWith('.gz')
        ? createGzip({
            // chunkSize: 64 * 1024, // no observed speedup
          })
        : undefined,
      fs.createWriteStream(outputPath, {
        // highWaterMark: 64 * 1024, // no observed speedup
      }),
    ].filter(_isTruthy) as TransformTyped<any, any>[]
  }
}

export const fs2 = new FS2()

export interface JsonOptions {
  spaces?: number
}

function stringify(data: any, opt?: JsonOptions): string {
  // If pretty-printing is enabled (spaces) - also add a newline at the end (to match our prettier config)
  return JSON.stringify(data, null, opt?.spaces) + (opt?.spaces ? '\n' : '')
}