dataplug-io/dataplug

View on GitHub
src/mapper.ts

Summary

Maintainability
B
4 hrs
Test Coverage
// Copyright (C) 2017-2019 Brainbean Apps OU (https://brainbeanapps.com).
// License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import { Transform } from 'stream'
import { Promise } from 'bluebird'
import * as logger from 'winston'
import { MapperOptions } from './mapperOptions'

/**
 * Maps input stream to derivative readable stream
 */
export default class Mapper extends Transform {
  /**
   * @param inputObjectMode True if input data is object stream, false if raw data chunks
   * @param outputObjectMode True if output data is object stream, false if raw data chunks
   * @param abortOnError True if error in derivative stream should emit error, false otherwise
   */
  static DEFAULT_OPTIONS: MapperOptions = {
    inputObjectMode: false,
    outputObjectMode: false,
    abortOnError: false,
  }
  private _factory: any
  private _options: MapperOptions
  private _stream: Transform | null
  private _notifyTransformComplete: Function | null
  private readonly _onStreamEndHandler: () => void
  private readonly _onStreamCloseHandler: () => void
  private readonly _onStreamErrorHandler: (error: Error) => void
  private readonly _onStreamDataHandler: (chunk: any) => void

  /**
   * @constructor
   *
   * @param factory Derivative stream factory
   * @param options Mapping options
   */
  constructor(factory: Function, options?: MapperOptions) {
    options = Object.assign({}, Mapper.DEFAULT_OPTIONS, options)

    super({
      objectMode: false,
      readableObjectMode: options.outputObjectMode,
      writableObjectMode: options.inputObjectMode,
    })

    this._factory = factory
    this._options = Object.assign({}, Mapper.DEFAULT_OPTIONS, options)

    this._stream = null
    this._notifyTransformComplete = null

    this._onStreamEndHandler = () => this._onStreamEnd()
    this._onStreamCloseHandler = () => this._onStreamClose()
    this._onStreamErrorHandler = error => this._onStreamError(error)
    this._onStreamDataHandler = chunk => this._onStreamData(chunk)
  }

  /**
   * https://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback
   * @override
   */
  _transform(
    chunk: Buffer | string | any,
    encoding: string,
    callback: Function,
  ) {
    Promise.try(() => this._factory(chunk))
      .catch(error => {
        logger.log('error', 'Error in Mapper while creating stream:', error)
        if (this._options.abortOnError) {
          throw error
        }
      })
      .then(stream => {
        this._stream = stream
        this._notifyTransformComplete = callback
        if (!this._stream) {
          callback()
          return
        }
        this._stream
          .on('end', this._onStreamEndHandler)
          .on('close', this._onStreamCloseHandler)
          .on('error', this._onStreamErrorHandler)
          .on('data', this._onStreamDataHandler)
      })
      .catch(error => {
        logger.log('error', 'Error in Mapper:', error)
        callback(error)
      })
  }

  /**
   * https://nodejs.org/api/stream.html#stream_writable_destroy_err_callback
   * @override
   */
  _destroy(error: Error, callback: any) {
    if (this._stream) {
      this._detachFromStream()
      this._stream.destroy()
      this._stream = null
    }

    super._destroy(error, callback)
  }

  /**
   * Detaches from current stream
   */
  _detachFromStream() {
    if (this._stream) {
      this._stream.removeListener('end', this._onStreamEndHandler)
      this._stream.removeListener('close', this._onStreamCloseHandler)
      this._stream.removeListener('error', this._onStreamErrorHandler)
      this._stream.removeListener('data', this._onStreamDataHandler)
    }
  }

  /**
   * Handles stream end event
   */
  _onStreamEnd() {
    this._detachFromStream()
    if (this._notifyTransformComplete) {
      this._notifyTransformComplete()
    }
  }

  /**
   * Handles stream close event
   */
  _onStreamClose() {
    this._detachFromStream()
    if (this._notifyTransformComplete) {
      this._notifyTransformComplete()
    }
  }

  /**
   * Handles stream error event
   */
  _onStreamError(error: Error) {
    logger.log('error', 'Error in Mapper derivative stream:', error)

    this._detachFromStream()
    if (this._notifyTransformComplete) {
      this._notifyTransformComplete(this._options.abortOnError ? error : null)
    }
  }

  /**
   * Handles stream readable event
   */
  _onStreamData(chunk: any) {
    this.push(chunk)
  }
}