src/mapper.ts
// Copyright (C) 2017-2019 Brainbean Apps OU (https://brainbeanapps.com).
// License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
import { Transform } from 'stream'
import { Promise } from 'bluebird'
import * as logger from 'winston'
import { MapperOptions } from './mapperOptions'
/**
* Maps input stream to derivative readable stream
*/
export default class Mapper extends Transform {
/**
* @param inputObjectMode True if input data is object stream, false if raw data chunks
* @param outputObjectMode True if output data is object stream, false if raw data chunks
* @param abortOnError True if error in derivative stream should emit error, false otherwise
*/
static DEFAULT_OPTIONS: MapperOptions = {
inputObjectMode: false,
outputObjectMode: false,
abortOnError: false,
}
private _factory: any
private _options: MapperOptions
private _stream: Transform | null
private _notifyTransformComplete: Function | null
private readonly _onStreamEndHandler: () => void
private readonly _onStreamCloseHandler: () => void
private readonly _onStreamErrorHandler: (error: Error) => void
private readonly _onStreamDataHandler: (chunk: any) => void
/**
* @constructor
*
* @param factory Derivative stream factory
* @param options Mapping options
*/
constructor(factory: Function, options?: MapperOptions) {
options = Object.assign({}, Mapper.DEFAULT_OPTIONS, options)
super({
objectMode: false,
readableObjectMode: options.outputObjectMode,
writableObjectMode: options.inputObjectMode,
})
this._factory = factory
this._options = Object.assign({}, Mapper.DEFAULT_OPTIONS, options)
this._stream = null
this._notifyTransformComplete = null
this._onStreamEndHandler = () => this._onStreamEnd()
this._onStreamCloseHandler = () => this._onStreamClose()
this._onStreamErrorHandler = error => this._onStreamError(error)
this._onStreamDataHandler = chunk => this._onStreamData(chunk)
}
/**
* https://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback
* @override
*/
_transform(
chunk: Buffer | string | any,
encoding: string,
callback: Function,
) {
Promise.try(() => this._factory(chunk))
.catch(error => {
logger.log('error', 'Error in Mapper while creating stream:', error)
if (this._options.abortOnError) {
throw error
}
})
.then(stream => {
this._stream = stream
this._notifyTransformComplete = callback
if (!this._stream) {
callback()
return
}
this._stream
.on('end', this._onStreamEndHandler)
.on('close', this._onStreamCloseHandler)
.on('error', this._onStreamErrorHandler)
.on('data', this._onStreamDataHandler)
})
.catch(error => {
logger.log('error', 'Error in Mapper:', error)
callback(error)
})
}
/**
* https://nodejs.org/api/stream.html#stream_writable_destroy_err_callback
* @override
*/
_destroy(error: Error, callback: any) {
if (this._stream) {
this._detachFromStream()
this._stream.destroy()
this._stream = null
}
super._destroy(error, callback)
}
/**
* Detaches from current stream
*/
_detachFromStream() {
if (this._stream) {
this._stream.removeListener('end', this._onStreamEndHandler)
this._stream.removeListener('close', this._onStreamCloseHandler)
this._stream.removeListener('error', this._onStreamErrorHandler)
this._stream.removeListener('data', this._onStreamDataHandler)
}
}
/**
* Handles stream end event
*/
_onStreamEnd() {
this._detachFromStream()
if (this._notifyTransformComplete) {
this._notifyTransformComplete()
}
}
/**
* Handles stream close event
*/
_onStreamClose() {
this._detachFromStream()
if (this._notifyTransformComplete) {
this._notifyTransformComplete()
}
}
/**
* Handles stream error event
*/
_onStreamError(error: Error) {
logger.log('error', 'Error in Mapper derivative stream:', error)
this._detachFromStream()
if (this._notifyTransformComplete) {
this._notifyTransformComplete(this._options.abortOnError ? error : null)
}
}
/**
* Handles stream readable event
*/
_onStreamData(chunk: any) {
this.push(chunk)
}
}