dataplug-io/dataplug-http

View on GitHub
src/pagedHttpGetReader.ts

Summary

Maintainability
F
5 days
Test Coverage
// Copyright (C) 2017-2019 Brainbean Apps OU (https://brainbeanapps.com).
// License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import _ from 'lodash'
import { Readable } from 'stream'
import Promise from 'bluebird'
import request from 'request'
import logger from 'winston'
import { URL } from 'url'
import HttpGetReader from './httpGetReader'

/**
 * Reads data from HTTP service URL that supports pagination, optionally altering the data using specified Transform
 */
export default class PagedHttpGetReader extends Readable {
  private url: string | URL
  private nextPage: any
  private options: any

  private query: any
  private headers: any
  private request: any
  private transform: any

  private onRequestErrorHandler: (err: Error) => void
  private onRequestCloseHandler: () => void
  private onRequestEndHandler: () => void
  private onRequestDataHandler: (chunck: any) => void
  private onRequestResponseHandler: (response: any) => void
  private onTransformErrorHandler: (err: Error) => void
  private onTransformCloseHandler: () => void
  private onTransformEndHandler: () => void
  private onTransformDataHandler: (chunck: any) => void
  private onTransformCompleteHandler: (data: any) => void

  /**
   * @constructor
   * @param {string|URL} url HTTP service URL
   * @param {PagedHttpGetReader~NextPageFunctor} nextPage Next-page functor
   * @param {PagedHttpGetReader~Options} [options=] Options
   */
  constructor(url: string | URL, nextPage: any, options: any = undefined) {
    options = Object.assign(
      {},
      {
        transformFactory: undefined,
        query: undefined,
        headers: undefined,
        responseHandler: HttpGetReader.defaultResponseHandler,
        abortOnError: false,
      },
      options,
    )

    const transform = options.transformFactory
      ? options.transformFactory()
      : null
    super({
      objectMode:
        transform && transform._readableState
          ? transform._readableState.objectMode
          : false,
    })

    this.url = url
    this.nextPage = nextPage
    this.options = options

    this.query = options.query ? _.cloneDeep(options.query) : null
    this.headers = options.headers ? _.cloneDeep(options.headers) : null
    this.request = null
    this.transform = null

    this.onRequestErrorHandler = (...args) => this.onRequestError(...args)
    this.onRequestCloseHandler = (...args) => this.onRequestClose(...args)
    this.onRequestEndHandler = (...args) => this.onRequestEnd(...args)
    this.onRequestDataHandler = (...args) => this.onRequestData(...args)
    this.onRequestResponseHandler = (...args) => this.onRequestResponse(...args)
    this.onTransformErrorHandler = (...args) => this.onTransformError(...args)
    this.onTransformCloseHandler = (...args) => this.onTransformClose(...args)
    this.onTransformEndHandler = (...args) => this.onTransformEnd(...args)
    this.onTransformDataHandler = (...args) => this.onTransformData(...args)
    this.onTransformCompleteHandler = (...args) =>
      this.onTransformComplete(...args)

    this.once('read', () => {
      this._safeStartRequest()
    })
  }

  /**
   * https://nodejs.org/api/stream.html#stream_readable_read_size_1
   * @override
   */
  _read(size: any) {
    this.emit('read')
  }

  /**
   * https://nodejs.org/api/stream.html#stream_readable_destroy_err_callback
   */
  _destroy(err: Error, callback: (err: Error) => void) {
    // Detach from current stream
    if (this.request) {
      const capturedRequest = this.request
      this.detachFromStreams()
      capturedRequest.destroy(err)
    }

    callback(err)
  }

  /**
   * Starts the request
   */
  _startRequest(): void {
    const options = {
      url: this.url,
      qs: this.query || {},
      headers: this.headers || {},
      gzip: true,
    }

    logger.log('verbose', `Starting paged HTTP GET request to ${options.url}`)
    logger.log('debug', 'Headers', options.headers)
    logger.log('debug', 'Query', options.qs)

    this.request = request(options)
    this.request
      .on('error', this.onRequestErrorHandler)
      .on('close', this.onRequestCloseHandler)
      .on('response', this.onRequestResponseHandler)
  }

  /**
   * Safely starts the request
   */
  _safeStartRequest(): void {
    try {
      this._startRequest()
    } catch (error) {
      logger.log(
        'error',
        `Error while starting HTTP GET request to '${this.url}'`,
        error,
      )
      this.emit('error', error)

      this.detachFromStreams()
      this.push(null)
    }
  }

  /**
   * Handles request error
   */
  private onRequestError(error: Error): void {
    logger.log(
      'error',
      `Error while making paged HTTP GET request to '${this.url}'`,
      error,
    )

    if (this.options.abortOnError) {
      this.emit('error', error)
    } else {
      if (!this.transform) {
        this.proceedToNextPage(null)
      } else {
        this.transform.end()
      }
    }
  }

  /**
   * Handles request close
   */
  private onRequestClose(): void {
    if (!this.transform) {
      this.proceedToNextPage(null)
    } else {
      this.transform.end()
    }
  }

  /**
   * Handles request end
   */
  private onRequestEnd(): void {
    logger.log('verbose', `Paged HTTP GET request to ${this.url} complete`)

    if (!this.transform) {
      this.proceedToNextPage(null)
    } else {
      this.transform.end()
    }
  }

  /**
   * Handles request data
   */
  private onRequestData(chunk: any): void {
    logger.log(
      'debug',
      `Received data while making paged HTTP GET request to ${this.url}`,
    )
    logger.log('silly', 'Chunk', _.toString(chunk))

    if (!this.transform) {
      this.push(chunk)
    }
  }

  /**
   * Handles request response
   */
  private onRequestResponse(response: any): void {
    logger.log(
      'debug',
      `Received response while making paged HTTP GET request to ${this.url}`,
    )
    logger.log('debug', 'Response HTTP version', response.httpVersion)
    logger.log('debug', 'Response headers', response.headers)
    logger.log('debug', 'Response status code', response.statusCode)
    logger.log('debug', 'Response status message', response.statusMessage)

    let shouldProcessData: boolean
    try {
      shouldProcessData = this.options.responseHandler(response, this.request)
    } catch (error) {
      logger.log(
        'error',
        `Error while handling response to HTTP GET request to '${this.url}'`,
        error,
      )

      this.emit('error', error)
      this.detachFromStreams()
      this.push(null)
      return
    }

    if (_.isBoolean(shouldProcessData) && shouldProcessData) {
      logger.log(
        'debug',
        `Processing response data from paged HTTP GET request to ${this.url}`,
      )
      this.request
        .on('data', this.onRequestDataHandler)
        .on('close', this.onRequestCloseHandler)
        .on('end', this.onRequestEndHandler)
      if (this.options.transformFactory) {
        this.transform = this.options.transformFactory()
        this.transform
          .on('data', this.onTransformDataHandler)
          .on('error', this.onTransformErrorHandler)
          .on('close', this.onTransformCloseHandler)
          .on('end', this.onTransformEndHandler)
          .on('complete', this.onTransformCompleteHandler)
        this.request.pipe(this.transform)
      }
    } else {
      this.detachFromStreams()
      logger.log(
        'debug',
        `Going to retry paged HTTP GET request to ${this.url}`,
      )
      Promise.resolve(shouldProcessData)
        .then(shouldProcessData => {
          if (shouldProcessData) {
            throw new Error('responseHandler should not return promised true')
          }

          logger.log('debug', `Retrying paged HTTP GET request to ${this.url}`)
          this._startRequest()
        })
        .catch(error => {
          logger.log(
            'error',
            `Error while waiting for retrying of paged HTTP GET request to '${
              this.url
            }'`,
            error,
          )

          this.emit('error', error)
          this.detachFromStreams()
          this.push(null)
        })
    }
  }

  /**
   * Handles transform error
   */
  private onTransformError(error: Error): void {
    logger.log(
      'error',
      `Error while transforming data from paged HTTP GET request to '${
        this.url
      }'`,
      error,
    )

    if (this.options.abortOnError) {
      this.emit('error', error)
    } else {
      this.transform.end()
    }
  }

  /**
   * Handles transform close
   */
  private onTransformClose(): void {
    this.transform.end()
  }

  /**
   * Handles transform end
   */
  private onTransformEnd(): void {
    logger.log(
      'verbose',
      `Transformation of data from paged HTTP GET request to ${this.url} ended`,
    )

    this.proceedToNextPage(null)
  }

  /**
   * Handles transform data
   */
  private onTransformData(chunk: any): void {
    logger.log(
      'debug',
      `Transformed data from paged HTTP GET request to ${this.url}`,
    )
    logger.log('silly', 'Chunk', chunk)

    this.push(chunk)
  }

  /**
   * Handles complete data from transform
   */
  private onTransformComplete(data: any): void {
    logger.log(
      'verbose',
      `Transformation of data from paged HTTP GET request to ${
        this.url
      } is complete`,
    )
    logger.log('silly', 'Data', data)

    this.proceedToNextPage(data)
  }

  /**
   * Detach from streams
   */
  private detachFromStreams(): void {
    logger.log(
      'verbose',
      `Detaching from streams of paged HTTP GET request to ${this.url}`,
    )

    if (this.request) {
      this.request.removeListener('data', this.onRequestDataHandler)
      this.request.removeListener('error', this.onRequestErrorHandler)
      this.request.removeListener('close', this.onRequestCloseHandler)
      this.request.removeListener('end', this.onRequestEndHandler)
      this.request.removeListener('response', this.onRequestResponseHandler)
      this.request = null
    }

    if (this.transform) {
      this.transform.removeListener('data', this.onTransformDataHandler)
      this.transform.removeListener('error', this.onTransformErrorHandler)
      this.transform.removeListener('close', this.onTransformCloseHandler)
      this.transform.removeListener('end', this.onTransformEndHandler)
      this.transform.removeListener('complete', this.onTransformCompleteHandler)
      this.transform = null
    }
  }

  /**
   * Proceed to next page
   *
   * @param {} data Data from current page
   */
  private proceedToNextPage(data: any): void {
    this.detachFromStreams()

    logger.log(
      'verbose',
      `Checking existence of next page of paged HTTP GET request to ${
        this.url
      }`,
    )

    let page = {
      url: this.url,
      query: this.query,
      headers: this.headers,
    }
    Promise.try(() => this.nextPage(page, data))
      .then(hasNextPage => {
        if (!hasNextPage) {
          logger.log(
            'verbose',
            `No page to proceed to of paged HTTP GET request to ${this.url}`,
          )

          this.push(null)
          return
        }

        logger.log(
          'verbose',
          `Proceeding to next page of paged HTTP GET request to ${this.url}`,
        )

        this.url = page.url
        this.query = page.query
        this.headers = page.headers
        this._startRequest()
      })
      .catch(error => {
        logger.log(
          'error',
          `Error in PagedHttpGetReader while checking existence of next page of paged HTTP GET request to ${
            this.url
          }`,
          error,
        )
        this.push(null)
      })
  }
}