lib/protocols/http/passes/forward.js

Summary

Maintainability
B
4 hrs
Test Coverage
const HttpProxy = require('http-proxy')
const parseUrl = require('url').parse
const Readable = require('stream').Readable
const retry = require('../retry')
const error = require('../../../error')
const helpers = require('../../../helpers')

module.exports = function forward (route, opts, req, res, done) {
  // Clone request object to avoid side-effects on mutations
  const forwardReq = helpers.cloneRequest(req, opts)

  // Run the forward phase middleware
  route.mw.run('forward', forwardReq, res, forwarder)

  function forwarder (err) {
    if (err) {
      route.emit('route:error', err, req, res)
      return done(error.reply(err, res))
    }

    const opts = forwardReq.rocky.options

    // Expose rocky params in the response for traceability
    res.rocky = forwardReq.rocky

    // Balance the request, if configured
    const balance = opts.balance
    if (balance && balance.length && !opts.target) {
      opts.target = helpers.permute(balance)
    }

    // Overwrite host header if necessary based on user options
    defineHostHeader(forwardReq, opts)

    // Reply with an error if target server was not defined
    if (!opts.target) {
      return done(missingTargetError(route, req, res))
    }

    forwardStrategy(route, opts, forwardReq, res, resolver)
  }

  function resolver (err) {
    if (err) {
      route.emit('proxy:error', err, forwardReq, res)
      return done(error.reply(err, res))
    }

    route.emit('proxy:response', forwardReq, res)
    done(null, res)
  }
}

function forwardStrategy (route, opts, req, res, resolver) {
  if (opts.retry) {
    forwardRetryRequest(route, opts, req, res, resolver)
  } else {
    forwardRequest(route, opts, req, res, resolver)
  }
}

function forwardRetryRequest (route, opts, forwardReq, res, done) {
  var closed = false
  var resolver = helpers.once(resolve)

  forwardReq.once('close', onClose)
  retry(opts.retry, res, task, resolver, onRetry)

  function onClose () {
    closed = true
  }

  function onRetry (err, res) {
    route.emit('proxy:retry', err, forwardReq, res)
  }

  function task (next) {
    if (closed) {
      resolver(new Error('Peer connection closed'))
      return next()
    }
    if (res.headersSent) {
      return next()
    }
    forwardRequest(route, opts, forwardReq, res, next)
  }

  function resolve (err) {
    forwardReq.removeListener('close', onClose)
    done(err)
  }
}

function defineHostHeader (forwardReq, opts) {
  const forwardHost = opts.forwardHost
  if (!forwardHost) return

  const headers = forwardReq.headers
  if (typeof forwardHost === 'string') {
    headers.host = forwardHost
    return forwardHost
  }

  const target = forwardReq.rocky.options.target || opts.target
  if (!target) return

  const host = target === Object(target)
    ? target.host
    : parseUrl(target).host

  if (host) headers.host = host
}

function forwardRequest (route, opts, forwardReq, res, done) {
  const proxy = new HttpProxy()

  // Use the proper request body
  useRequestBody(forwardReq, opts)

  // Prepare propagation of proxy events to parent bus
  propagateEvents(proxy, route, finisher)

  // Finally forward the request
  proxy.web(forwardReq, res, opts, finisher)

  function finisher () {
    cleanup(proxy)
    done.apply(null, arguments)
  }
}

function useRequestBody (forwardReq, opts) {
  var body = forwardReq.body

  // Forward original body instead of the intercepted one, if enabled
  if (opts.forwardOriginalBody && forwardReq._originalBody) {
    forwardReq.headers['content-length'] = forwardReq._originalBodyLength
    body = forwardReq._originalBody
  }

  // If body is present, use it as buffer stream
  if (body && (typeof body === 'string' || body instanceof Buffer)) {
    opts.buffer = createBodyStream(body)
  }
}

function propagateEvents (proxy, route, next) {
  proxy.on('proxyReq', function (proxyReq, req, res, options) {
    route.emit('proxyReq', proxyReq, req, res, options)
  })
  proxy.on('proxyRes', function (proxyRes, req, res) {
    route.emit('proxyRes', proxyRes, req, res)
    next(null, proxyRes)
  })
}

function cleanup (proxy) {
  proxy.removeAllListeners('proxyReq')
  proxy.removeAllListeners('proxyRes')
  proxy.removeAllListeners('error')
}

function createBodyStream (body) {
  const stream = new Readable()
  stream.push(body)
  stream.push(null)
  return stream
}

function missingTargetError (route, req, res) {
  const err = new Error('Cannot forward: missing target URL')
  route.emit('route:error', err, req, res)
  error.reply(err, res)
  return err
}