dataplug-io/dataplug

View on GitHub
src/scanner.ts

Summary

Maintainability
A
2 hrs
Test Coverage
// Copyright (C) 2017-2019 Brainbean Apps OU (https://brainbeanapps.com).
// License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import { Transform } from 'stream'
import Promise from 'bluebird'
import * as logger from 'winston'

/**
 * Scans the object stream
 */
export default class Scanner extends Transform {
  private _scannerCallback: any
  private readonly _abortOnError: boolean
  /**
   * @constructor
   * @param scannerCallback
   * @param abortOnError True if error in filtering should emit error, false otherwise
   */
  constructor(scannerCallback: Function, abortOnError: boolean = false) {
    super({
      objectMode: true,
    })

    this._scannerCallback = scannerCallback
    this._abortOnError = abortOnError
  }

  /**
   * https://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback
   * @override
   */
  _transform(
    chunk: Buffer | string | any,
    encoding: string,
    callback: Function,
  ) {
    Promise.try(() => this._scannerCallback(chunk))
      .catch(error => {
        logger.log('error', 'Error in Scanner', error)
        return error
      })
      .then(error => {
        callback(this._abortOnError ? error : null, chunk)
      })
  }
}