dataplug-io/dataplug

View on GitHub
src/filter.ts

Summary

Maintainability
A
2 hrs
Test Coverage
// Copyright (C) 2017-2019 Brainbean Apps OU (https://brainbeanapps.com).
// License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import { isError } from 'lodash'
import { Transform } from 'stream'
import { Promise } from 'bluebird'
import * as logger from 'winston'

/**
 * Filters the object stream
 */
export default class Filter extends Transform {
  private _filterCallback: Function
  private readonly _abortOnError: boolean
  /**
   * @constructor
   *
   * @param filterCallback
   * @param abortOnError True if error in filtering should emit error, false otherwise
   */
  constructor(filterCallback: Function, abortOnError: boolean = false) {
    super({
      objectMode: true,
    })

    this._filterCallback = filterCallback
    this._abortOnError = abortOnError
  }

  /**
   * https://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback
   * @override
   */
  _transform(
    chunk: Buffer | string | any,
    encoding: string,
    callback: Function,
  ) {
    Promise.try(() => this._filterCallback(chunk))
      .catch(error => {
        logger.log('error', 'Error in Filter', error)
        return error
      })
      .then(result => {
        const error = isError(result) && this._abortOnError ? result : null
        const data = isError(result) || !result ? null : chunk
        callback(error, data)
      })
  }
}