dataplug-io/dataplug-http

View on GitHub
src/httpGetReader.ts

Summary

Maintainability
F
3 days
Test Coverage
// Copyright (C) 2017-2019 Brainbean Apps OU (https://brainbeanapps.com).
// License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import _ from 'lodash'
import { Readable } from 'stream'
import request from 'request'
import logger from 'winston'
import { URL } from 'url'

/**
 * Reads data from HTTP service URL, optionally altering the data using specified Transform
 */
export default class HttpGetReader extends Readable {
  private url: string | URL
  private options: any
  private request: any

  private onRequestErrorHandler: (err: Error) => void
  private onRequestCloseHandler: () => void
  private onRequestEndHandler: () => void
  private onRequestDataHandler: (chunck: any) => void
  private onRequestResponseHandler: (response: any) => void
  private onTransformErrorHandler: (err: Error) => void
  private onTransformCloseHandler: () => void
  private onTransformEndHandler: () => void
  private onTransformDataHandler: (chunck: any) => void

  /**
   * @constructor
   * @param {string|URL} url HTTP service URL
   * @param {HttpGetReader~Options} [options=] Options
   */
  constructor(url: string | URL, options: any = undefined) {
    options = Object.assign(
      {},
      {
        transform: undefined,
        query: undefined,
        headers: undefined,
        responseHandler: HttpGetReader.defaultResponseHandler,
        abortOnError: false,
      },
      options,
    )

    super({
      objectMode:
        options.transform && options.transform._readableState
          ? options.transform._readableState.objectMode
          : false,
    })

    this.url = url
    this.options = options

    this.request = null

    this.onRequestErrorHandler = (...args: any) => this.onRequestError(...args)
    this.onRequestCloseHandler = (...args) => this.onRequestClose(...args)
    this.onRequestEndHandler = (...args) => this.onRequestEnd(...args)
    this.onRequestDataHandler = (...args) => this.onRequestData(...args)
    this.onRequestResponseHandler = (...args) => this.onRequestResponse(...args)
    this.onTransformErrorHandler = (...args) => this.onTransformError(...args)
    this.onTransformCloseHandler = (...args) => this.onTransformClose(...args)
    this.onTransformEndHandler = (...args) => this.onTransformEnd(...args)
    this.onTransformDataHandler = (...args) => this.onTransformData(...args)

    this.once('read', () => {
      this._safeStartRequest()
    })
  }

  /**
   * https://nodejs.org/api/stream.html#stream_readable_read_size_1
   * @override
   */
  _read(size: any): void {
    this.emit('read')
  }

  /**
   * https://nodejs.org/api/stream.html#stream_readable_destroy_err_callback
   */
  _destroy(err: any, callback: (err: any) => void): void {
    // Detach from current stream
    if (this.request) {
      const capturedRequest = this.request
      this.detachFromStreams()
      capturedRequest.destroy(err)
    }

    callback(err)
  }

  /**
   * Starts the request
   */
  _startRequest(): void {
    const options = {
      url: this.url,
      qs: this.options.query || {},
      headers: this.options.headers || {},
      gzip: true,
    }

    logger.log('verbose', `Starting HTTP GET request to ${options.url}`)
    logger.log('debug', 'Headers', options.headers)
    logger.log('debug', 'Query', options.qs)

    this.request = request(options)
    this.request
      .on('error', this.onRequestErrorHandler)
      .on('close', this.onRequestCloseHandler)
      .on('response', this.onRequestResponseHandler)
  }

  /**
   * Safely starts the request
   */
  _safeStartRequest(): void {
    try {
      this._startRequest()
    } catch (error) {
      logger.log(
        'error',
        `Error while starting HTTP GET request to '${this.url}'`,
        error,
      )
      this.emit('error', error)

      this.detachFromStreams()
      this.push(null)
    }
  }

  /**
   * Handles request error
   */
  private onRequestError(error: any = undefined): void {
    logger.log(
      'error',
      `Error while making HTTP GET request to '${this.url}'`,
      error,
    )

    if (this.options.abortOnError) {
      this.emit('error', error)
    } else {
      if (!this.options.transform) {
        this.detachFromStreams()
        this.push(null)
      } else {
        this.options.transform.end()
      }
    }
  }

  /**
   * Handles request close
   */
  private onRequestClose(): void {
    if (!this.options.transform) {
      this.detachFromStreams()
      this.push(null)
    } else {
      this.options.transform.end()
    }
  }

  /**
   * Handles request end
   */
  private onRequestEnd(): void {
    logger.log('verbose', `HTTP GET request to ${this.url} complete`)

    if (!this.options.transform) {
      this.detachFromStreams()
      this.push(null)
    } else {
      this.options.transform.end()
    }
  }

  /**
   * Handles request data
   */
  private onRequestData(chunk: any): void {
    logger.log(
      'debug',
      `Received data while making HTTP GET request to ${this.url}`,
    )
    logger.log('silly', 'Chunk', _.toString(chunk))

    if (!this.options.transform) {
      this.push(chunk)
    }
  }

  /**
   * Handles request response
   */
  private onRequestResponse(response: any): void {
    logger.log(
      'debug',
      `Received response while making HTTP GET request to ${this.url}`,
    )
    logger.log('debug', 'Response HTTP version', response.httpVersion)
    logger.log('debug', 'Response headers', response.headers)
    logger.log('debug', 'Response status code', response.statusCode)
    logger.log('debug', 'Response status message', response.statusMessage)

    let shouldProcessData: boolean
    try {
      shouldProcessData = this.options.responseHandler(response, this.request)
    } catch (error) {
      logger.log(
        'error',
        `Error while handling response to HTTP GET request to '${this.url}'`,
        error,
      )

      this.emit('error', error)
      this.detachFromStreams()
      this.push(null)
      return
    }

    if (shouldProcessData) {
      logger.log(
        'debug',
        `Processing response data from paged HTTP GET request to '${this.url}'`,
        this.url,
      )
      this.request
        .on('data', this.onRequestDataHandler)
        .on('close', this.onRequestCloseHandler)
        .on('end', this.onRequestEndHandler)
      if (this.options.transform) {
        this.options.transform
          .on('data', this.onTransformDataHandler)
          .on('error', this.onTransformErrorHandler)
          .on('close', this.onTransformCloseHandler)
          .on('end', this.onTransformEndHandler)
        this.request.pipe(this.options.transform)
      }
    } else {
      this.detachFromStreams()
      logger.log('debug', `Going to retry HTTP GET request to ${this.url}`)
      Promise.resolve(shouldProcessData)
        .then(shouldProcessData => {
          if (shouldProcessData) {
            throw new Error('responseHandler should not return promised true')
          }

          logger.log('debug', `Retrying HTTP GET request to ${this.url}`)
          this._startRequest()
        })
        .catch(error => {
          logger.log(
            'error',
            `Error while waiting for retrying of HTTP GET request to '${
              this.url
            }'`,
            error,
          )

          this.emit('error', error)
          this.detachFromStreams()
          this.push(null)
        })
    }
  }

  /**
   * Handles transform error
   */
  private onTransformError(error: Error): void {
    logger.log(
      'error',
      `Error while transforming data from HTTP GET request to '${this.url}'`,
      error,
    )

    if (this.options.abortOnError) {
      this.emit('error', error)
    } else {
      this.options.transform.end()
    }
  }

  /**
   * Handles transform close
   */
  private onTransformClose(): void {
    this.push(null)
    this.detachFromStreams()
  }

  /**
   * Handles transform end
   */
  private onTransformEnd(): void {
    logger.log(
      'verbose',
      `Transformation of data from HTTP GET request to ${this.url} ended`,
    )

    this.push(null)
    this.detachFromStreams()
  }

  /**
   * Handles transform data
   */
  private onTransformData(chunk: any): void {
    logger.log('debug', `Transformed data from HTTP GET request to ${this.url}`)
    logger.log('silly', 'Chunk', chunk)

    this.push(chunk)
  }

  /**
   * Detach from streams
   */
  private detachFromStreams(): void {
    logger.log(
      'verbose',
      `Detaching from streams of HTTP GET request to ${this.url}`,
    )

    if (this.request) {
      this.request.removeListener('data', this.onRequestDataHandler)
      this.request.removeListener('error', this.onRequestErrorHandler)
      this.request.removeListener('close', this.onRequestCloseHandler)
      this.request.removeListener('end', this.onRequestEndHandler)
      this.request.removeListener('response', this.onRequestResponseHandler)
      this.request = null
    }

    if (this.options.transform) {
      this.options.transform.removeListener('data', this.onTransformDataHandler)
      this.options.transform.removeListener(
        'error',
        this.onTransformErrorHandler,
      )
      this.options.transform.removeListener(
        'close',
        this.onTransformCloseHandler,
      )
      this.options.transform.removeListener('end', this.onTransformEndHandler)
      this.options.transform = null
    }
  }

  /**
   * Default response handler
   */
  static defaultResponseHandler(response: any, request: any): boolean {
    if (response.statusCode === 200 || response.statusCode === 404) {
      return true
    }

    if (response.statusCode === 429) {
      return false
    }

    throw new Error(
      `HTTP ${response.method} request failed with ${
        response.statusCode
      }: ${response.statusMessage || 'no details'}`,
    )
  }
}