dataplug-io/dataplug

View on GitHub
src/counter.ts

Summary

Maintainability
A
2 hrs
Test Coverage
// Copyright (C) 2017-2019 Brainbean Apps OU (https://brainbeanapps.com).
// License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import { forOwn, isArray, findLast, isFunction } from 'lodash'
import check from 'check-types'
import { Transform } from 'stream'
import { Promise } from 'bluebird'
import * as logger from 'winston'
import { CounterOptions } from './counterOptions'

/**
 * Counts objects
 */
export default class Counter extends Transform {
  static DEFAULT_OPTIONS: {
    counter: (data: any, count: (...args: any[]) => void) => void
    logger: (...args: any[]) => void
    prefixComponents: string[]
    suffixComponents: string[]
    thresholds: number[]
    abortOnError: boolean
  }
  private _options: any
  private _defaultCount: (...args: any[]) => void
  private readonly _counters: {}
  /**
   * @constructor
   * @param options Processed counter options
   */
  constructor(options?: {} | CounterOptions) {
    options = Object.assign({}, Counter.DEFAULT_OPTIONS, options)
    check.assert.like(options, Counter.DEFAULT_OPTIONS)

    super({
      objectMode: true,
    })

    this._options = options

    this._counters = {}
    this._defaultCount = (...args) => this._count(...args)
  }

  /**
   * https://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback
   * @override
   */
  _transform(
    chunk: Buffer | string | any,
    encoding: string,
    callback: Function,
  ) {
    Promise.try(() => this._options.counter(chunk, this._defaultCount))
      .then(() => {
        this._print()
        callback(null, chunk)
      })
      .catch(error => {
        logger.log('error', 'Error in Counter:', error)
        callback(this._options.abortOnError ? error : null, chunk)
      })
  }

  /**
   * https://nodejs.org/api/stream.html#stream_transform_flush_callback
   * @override
   */
  _flush(callback: Function) {
    this._print(false)
    callback()
  }

  /**
   * Prints current counter to log
   */
  _print(checkThresholds = true) {
    forOwn(this._counters, (value, counter) => {
      const thresholds = isArray(this._options.thresholds)
        ? this._options.thresholds
        : this._options.thresholds[counter]

      const threshold = findLast(
        thresholds || [1],
        threshold => value >= threshold,
      )

      if (checkThresholds && value % threshold !== 0) {
        return
      }

      const prefixComponents = isArray(this._options.prefixComponents)
        ? this._options.prefixComponents
        : this._options.prefixComponents[counter]

      const suffixComponents = isArray(this._options.suffixComponents)
        ? this._options.suffixComponents
        : this._options.suffixComponents[counter]

      const logger = isFunction(this._options.logger)
        ? this._options.logger
        : this._options.logger[counter]

      logger(...(prefixComponents || []), value, ...(suffixComponents || []))
    })
  }

  /**
   * Counts a counter
   */
  _count(counter = '') {
    check.assert.string(counter)
    this._counters[counter] = (this._counters[counter] || 0) + 1
  }
}