JuMastro/node-rotation-file

View on GitHub
src/index.js

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
const fs = require('fs')
const path = require('path')
const { Writable } = require('stream')
const { ensureOptions } = require('./options.js')
const { makeStreamRotation } = require('./rotation.js')

/**
 * The RotationFileStream constructor options object.
 * @typedef {object} RotationFileStream~Options
 * @prop {string} path - The file path location.
 * @prop {null|number|string} [maxSize='10m'] - The size as integer number, string tag or null.
 * @prop {null|number|string} [maxTime='1D'] - The time as integer number, string tag or null.
 * @prop {null|number} [maxArchives=14] - The number of file to keep in history.
 * @prop {null|string} [archivesDirectory=dirname(path)] - The directory location where archives are stored.
 * @prop {null|string} [compressType='gzip'] - The type of compression.
 */

module.exports = class RotationFileStream extends Writable {
  /**
   * Check, init and set properties and listeners.
   * @param {RotationFileStream:Options} options - The {@link RotationFileStream:Options} options object.
   */
  constructor (options = {}) {
    const opts = ensureOptions(options)
    super(opts)

    this.path = opts.path
    this.compressType = opts.compressType
    this.maxSize = opts.maxSize
    this.maxTime = opts.maxTime
    this.maxArchives = opts.maxArchives
    this.archivesDirectory = opts.archivesDirectory

    this.birthtime = null
    this.size = null
    this.error = null
    this.writer = null
    this.queue = []

    this.writing = false
    this.rotating = false
    this.ending = false
    this.ended = false

    this.once('init', this._init)
    this.once('error', this._error)
    this.on('open', this._open)
    this.on('ready', this._drain)
    this.on('close', this._close)
    this.on('drain', this._drain)
    this.on('rotate', makeStreamRotation.bind(this))

    setImmediate(() => this.emit('init'))
  }

  /**
   * Check if the stream is writable.
   * @returns {boolean}
   */
  isWritable () {
    return this.writer !== null && !this.writing && !this.rotating
  }

  /**
   * Starts to ending the stream, and write a last chunk if once provided.
   * See: https://nodejs.org/api/stream.html#stream_writable_end_chunk_encoding_callback
   * @param {Buffer|string|any} chunk - Last chunk to add to queue.
   * @param {string} encoding - Chunk encoding for string chunks.
   * @param {Function} nextEvent - Function is called once the chunk has been written.
   * @returns {void}
   */
  end (chunk, encoding, nextEvent) {
    this.ending = true

    if (chunk) {
      this.prependOnceListener('close', () => {
        this.queue.push({ chunk, nextEvent })
        this._drain()
      })
    }

    this._drain()
  }

  /**
   * Run the timeout rotation.
   * @returns {void}
   */
  _createTimeoutRotation () {
    const limit = this.maxTime - (new Date().getTime() - this.birthtime.getTime())
    const timeout = setTimeout(() => this.emit('rotate'), limit)
    this.once('close', () => clearTimeout(timeout))
  }

  /**
   * Init the stream.
   * Prepares the directories of `path` and `archivesDirectory`.
   * @returns {void}
   */
  async _init () {
    await fs.promises.mkdir(path.dirname(this.path), { recursive: true })
    await fs.promises.mkdir(this.archivesDirectory, { recursive: true })
    this.emit('open')
  }

  /**
   * Open the sub-writer stream.
   * @returns {void}
   */
  _open () {
    const writer = fs.createWriteStream(this.path, { flags: 'a' })
    writer.once('error', (err) => this.emit('error', err))
    writer.once('open', async () => {
      const { birthtime, size } = await fs.promises.stat(this.path)
      Object.assign(this, { birthtime, size, writer, ended: false })
      this.emit('ready')

      if (this.maxTime) {
        this._createTimeoutRotation()
      }
    })
  }

  /**
   * Close the writer stream.
   * @param {Function} next - Event triggered after the closing of the writer.
   * @returns {void}
   */
  _close (next) {
    if (this.writer) {
      this.writer.on('finish', next)
      this.writer.end()
      this.writer = null
    } else {
      next()
    }
  }

  /**
   * Store an error and start the end of stream.
   * @param {Object|Error} err
   * @returns {void}
   */
  _error (err) {
    this.error = err
    this.end()
  }

  /**
   * Attempt to consume pending queued chunks.
   * @returns {void}
   */
  _drain () {
    if (!this.isWritable()) {
      return
    }

    if (!this.queue.length) {
      if (this.ending) {
        this.emit('close', () => {
          this.ending = false
          this.ended = true
          this.emit('finish')
        })
      }

      return
    }

    if (this.maxSize && this.maxSize <= this.size) {
      this.emit('rotate')
      return
    }

    this._consumePendingChunk()
  }

  _consumePendingChunk () {
    const item = this.queue.shift()
    this.writing = true
    this.writer.write(item.chunk, (err) => {
      this.writing = false
      this.size += item.chunk.length

      if (err) {
        this.emit('error', err)
      }

      if (item.nextEvent) {
        item.nextEvent(err)
      }

      this._drain()
    })
  }

  /**
   * Send chunk to the underlying resource.
   * See: https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1
   * @param {Buffer|string|any} chunk - Chunk to add to queue.
   * @param {string} encoding - Chunk encoding for string chunks.
   * @param {Function} nextEvent - Function is called once the chunk has been written.
   * @returns {void}
   */
  _write (chunk, encoding, nextEvent) {
    this.queue.push({ chunk, nextEvent })
    this._drain()
  }

  /**
   * Send chunks to the underlying resource.
   * Used when several chunks are written together, so they will be processed by group to be more efficient.
   * See: https://nodejs.org/api/stream.html#stream_writable_writev_chunks_callback
   * @param {Object[]} chunk - Chunks to add to queue.
   * @param {Function} nextEvent - Function is called once the chunks have been written.
   * @returns {void}
   */
  _writev (chunks, nextEvent) {
    Object.assign(chunks[chunks.length - 1], { nextEvent })
    this.queue = this.queue.concat(chunks)
    this._drain()
  }
}