meteor/meteor

View on GitHub
tools/utils/http-helpers.js

Summary

Maintainability
F
3 days
Test Coverage
///
/// utility functions for dealing with urls and http
///

var os = require('os');
var util = require('util');

var _ = require('underscore');

var files = require('../fs/files');
var auth = require('../meteor-services/auth.js');
var config = require('../meteor-services/config.js');
var release = require('../packaging/release.js');
var Console = require('../console/console.js').Console;
var timeoutScaleFactor = require('./utils.js').timeoutScaleFactor;

import { Writable } from "stream";

class ConcatStream extends Writable {
  constructor() {
    super();
    this.chunks = [];
    this.size = 0;
  }

  _write(chunk, encoding, next) {
    this.chunks.push(chunk);
    this.size += chunk.length;
    next();
  }

  getBuffer() {
    if (this.chunks.length !== 1) {
      this.chunks[0] = Buffer.concat(this.chunks);
      this.chunks.length = 1;
    }

    return this.chunks[0];
  }

  end(force = false) {
    // Override the Writable#end method to ignore any .end() calls for
    // this stream, since we likely want to resume the download later.
    if (force === true) {
      super.end();
    }
  }
}

// Helper that tracks bytes written to a writable
var WritableWithProgress = function (writable, listener) {
  var self = this;
  self._inner = writable;
  self._listener = listener;
};

Object.assign(WritableWithProgress.prototype, {
  write: function (chunk, encoding, callback) {
    var self = this;
    self._listener(chunk.length, false);
    return self._inner.write(chunk, encoding);
  },

  end: function (chunk, encoding, callback) {
    var self = this;
    self._listener(chunk ? chunk.length : 0, true);
    return self._inner.end(chunk, encoding);
  },

  _progress: function (n, done) {
    var self = this;

    var state = self._state;
    state.current += n;
    if (done) {
      state.current.done = true;
    }
    self.progress.reportProgress(state);
  },

  on: function (name, callback) {
    return this._inner.on(name, callback);
  },

  once: function () {
    return this._inner.once(...arguments);
  },

  emit: function () {
    return this._inner.emit(...arguments);
  }
});


// Compose a User-Agent header.
var getUserAgent = function () {
  var version;

  if (release.current) {
    version = release.current.isCheckout() ? 'checkout' : release.current.name;
  } else {
    // This happens when we haven't finished starting up yet (say, the
    // user passed --release 1.2.3 and we have to download 1.2.3
    // before we can get going), or if we are using an installed copy
    // of Meteor to 'meteor update'ing a project that was created by a
    // checkout and doesn't have a version yet.
    version = files.inCheckout() ? 'checkout' : files.getToolsVersion();
  }

  return util.format('Meteor/%s OS/%s (%s; %s; %s;)', version,
                     os.platform(), os.type(), os.release(), os.arch());
};


var httpHelpers = exports;
Object.assign(exports, {
  getUserAgent: getUserAgent,

  // A wrapper around request with the following improvements:
  //
  // - It will respect proxy environment variables if present
  //   (HTTP_PROXY or HTTPS_PROXY as appropriate).
  //
  // - It will set a reasonable User-Agent header.
  //
  // - If you omit the callback it will run synchronously. The return
  //   value will be an object with keys 'response' and 'body' (with
  //   the same meaning as the arguments to request's normal
  //   callback), or it will throw.
  //
  // - If Set-Cookie headers are present on the response, *and* you
  //   are using a callback, it will parse the cookies and include
  //   them as a setCookie attribute on the object passed to the
  //   callback. setCookie is a simple map from cookie name to cookie
  //   value. If you want expiration time and attributes you'll have
  //   to parse it yourself. If there are multiple Set-Cookie headers
  //   for the same cookie it is unspecified which one you'll get.
  //
  // - You can provide a 'bodyStream' option which is a stream that
  //   will be used for the body of the request.
  //
  // - For authenticated Meteor Software services, you can set the
  //   'useSessionHeader' and/or 'useAuthHeader' options (to true) to
  //   send X-Meteor-Session/X-Meteor-Auth headers using values from
  //   the session file.
  //
  // - forceSSL is always set to true. Always. And followRedirect is
  //   set to false since it doesn't understand origins (see comment
  //   in implementation).
  //
  // - An optional options.onRequest callback may be provided if the
  //   caller desires access to the request object.
  //
  // NB: With useSessionHeader and useAuthHeader, this function will
  // read *and possibly write to* the session file, so if you are
  // writing auth code (in auth.js) and you call it, be sure to reread
  // the session file afterwards.
  request: function (urlOrOptions, callback) {
    var options;
    if (!_.isObject(urlOrOptions)) {
      options = { url: urlOrOptions };
    } else {
      options = _.clone(urlOrOptions);
    }

    var outputStream;
    if (_.has(options, 'outputStream')) {
      outputStream = options.outputStream;
      delete options.outputStream;
    }

    var bodyStream;
    if (_.has(options, 'bodyStream')) {
      bodyStream = options.bodyStream;
      delete options.bodyStream;
    }

    // Body stream length for progress
    var bodyStreamLength = 0;
    if (_.has(options, 'bodyStreamLength')) {
      bodyStreamLength = options.bodyStreamLength;
      delete options.bodyStreamLength;
    } else {
      // Guess the body stream length as 1MB
      // Hopefully if it's much bigger the caller will set it
      // If it is much small, we will pleasantly surprise the user!
      if (bodyStream) {
        bodyStreamLength = 1024 * 1024;
      }
    }

    // Response length for progress
    var responseLength = 128 * 1024;
    if (_.has(options, 'responseLength')) {
      responseLength = options.responseLength;
      delete options.responseLength;
    }

    var progress = null;
    if (_.has(options, 'progress')) {
      progress = options.progress;
      delete options.progress;

      if (callback) {
        throw new Error("Not safe to use progress with callback");
      }
    }

    options.headers = Object.assign({
      'User-Agent': getUserAgent()
    }, options.headers || {});

    // This should never, ever be false, or else why are you using SSL?
    options.forceSSL = true;
    if (process.env.CAFILE) {
      options.ca = files.readFile(process.env.CAFILE);
    }

    // followRedirect is very dangerous because request does not
    // appear to segregate cookies by origin, so any cookies (and
    // apparently headers as well, eg X-Meteor-Auth) sent on the
    // original request could get forwarded to an unexpected domain in
    // a redirect. This is almost certainly not something you ever
    // want.
    options.followRedirect = false;

    var useSessionHeader = options.useSessionHeader;
    delete options.useSessionHeader;
    var useAuthHeader = options.useAuthHeader;
    delete options.useAuthHeader;
    if (useSessionHeader || useAuthHeader) {
      var sessionHeader = auth.getSessionId(config.getAccountsDomain());
      if (sessionHeader) {
        options.headers['X-Meteor-Session'] = sessionHeader;
      }
      if (callback) {
        throw new Error("session header can't be used with callback");
      }
    }
    if (useAuthHeader) {
      var authHeader = auth.getSessionToken(config.getAccountsDomain());
      if (authHeader) {
        options.headers['X-Meteor-Auth'] = authHeader;
      }
    }

    var promise;
    if (! callback) {
      promise = new Promise(function (resolve, reject) {
        callback = function (err, response, body) {
          if (err) {
            reject(err);
            return;
          }

          var setCookie = {};
          _.each(response.headers["set-cookie"] || [], function (h) {
            var match = h.match(/^([^=\s]+)=([^;\s]+)/);
            if (match) {
              setCookie[match[1]] = match[2];
            }
          });

          if (useSessionHeader && _.has(response.headers, "x-meteor-session")) {
            auth.setSessionId(config.getAccountsDomain(),
                              response.headers['x-meteor-session']);
          }

          resolve({
            response: response,
            body: body,
            setCookie: setCookie
          });
        };
      });
    }

    // try to get proxy from environment.
    // similar code is in packages/ddp/stream_client_nodejs.js
    var proxy = process.env.HTTP_PROXY || process.env.http_proxy || null;
    // if we're going to an https url, try the https_proxy env variable first.
    if (/^https/i.test(options.url)) {
      proxy = process.env.HTTPS_PROXY || process.env.https_proxy || proxy;
    }
    if (proxy && !options.proxy) {
      options.proxy = proxy;
    }

    if (! _.has(options, "timeout")) {
      // 60 seconds for timeout between initial response headers and data,
      // and between chunks of data while reading the rest of the response.
      options.timeout = 60 * 1000 * timeoutScaleFactor;
    } else if (! (typeof options.timeout === "number" &&
                  options.timeout > 0)) {
      // The timeout can be disabled by passing anything other than a
      // positive number, e.g. { timeout: null }.
      delete options.timeout;
    }

    let onRequest;
    if (_.has(options, "onRequest")) {
      onRequest = options.onRequest;
      delete options.onRequest;
    }

    // request is the most heavy-weight of the tool's npm dependencies; don't
    // require it until we definitely need it.
    Console.debug("Doing HTTP request: ", options.method || 'GET', options.url);
    var request = require('request');
    var req = request(options, function (error, response, body) {
      if (! error &&
          response &&
          (typeof body === "string" ||
           Buffer.isBuffer(body))) {
        const contentLength = Number(response.headers["content-length"]);
        const actualLength = Buffer.byteLength(body);

        if (contentLength > 0 &&
            actualLength < contentLength) {
          error = new Error(
            "Expected " + contentLength + " bytes in request body " +
              "but received only " + actualLength);
        }
      }

      return callback.call(this, error, response, body);
    });

    if (_.isFunction(onRequest)) {
      onRequest(req);
    }

    var totalProgress = { current: 0, end: bodyStreamLength + responseLength, done: false };

    if (bodyStream) {
      var dest = req;
      if (progress) {
        dest = new WritableWithProgress(dest, function (n, done) {
          if (!totalProgress.done) {
            totalProgress.current += n;
            progress.reportProgress(totalProgress);
          }
        });
      }
      bodyStream.pipe(dest);
    }

    if (outputStream) {
      req.pipe(outputStream);
    }

    if (progress) {
      httpHelpers._addProgressEvents(req);
      req.on('progress', function (state) {
        if (!totalProgress.done) {
          totalProgress.current = bodyStreamLength + state.current;
          totalProgress.end = bodyStreamLength + state.end;
          totalProgress.done = state.done;

          progress.reportProgress(totalProgress);
        }
      });
    }

    if (promise) {
      try {
        return promise.await();
      } finally {
        if (progress) {
          progress.reportProgressDone();
        }
      }
    } else {
      return req;
    }
  },

  // Adds progress callbacks to a request
  // Based on request-progress
  _addProgressEvents: function (request) {
    var state = {};

    var emitProgress = function () {
      request.emit('progress', state);
    };

    request
      .on('response', function (response) {
        state.end = undefined;
        state.done = false;
        state.current = 0;
        var contentLength = response.headers['content-length'];
        if (contentLength) {
          state.end = Number(contentLength);
        }
        emitProgress();
      })
      .on('data', function (data) {
        state.current += data.length;
        emitProgress();
      })
      .on('end', function (data) {
        state.done = true;
        emitProgress();
      });
  },

  // A synchronous wrapper around request(...) that returns the response "body"
  // or throws.
  //
  // (This has gone through a few refactors and it might be possible
  // to fully roll it into httpHelpers.request() at this point.)
  getUrl: function (urlOrOptions) {
    try {
      var result = httpHelpers.request(urlOrOptions);
    } catch (e) {
      throw new files.OfflineError(e);
    }

    const response = result.response;
    const body = result.body;
    const href = response.request.href;

    if (response.statusCode >= 400 && response.statusCode < 600) {
      throw Error(
        body ||
          `Could not get ${href}; server returned [${response.statusCode}]`);
    } else {
      return body;
    }
  },

  // More or less as above, except with support for multiple attempts per
  // request and resuming on retries. This means if the connection is bad,
  // we can sometimes complete a request, even if each individual attempt fails.
  // We only use this for package downloads. In theory we could use it for
  // all requests but that seems like overkill and it isn't well tested in
  // other scenarioes.
  getUrlWithResuming(urlOrOptions) {
    const options = _.isObject(urlOrOptions) ? _.clone(urlOrOptions) : {
      url: urlOrOptions,
    };

    const maxAttempts =
      _.has(options, "maxAttempts")
      ? options.maxAttempts : 10;

    const retryDelaySecs =
      _.has(options, "retryDelaySecs")
      ? options.retryDelaySecs : 5;

    const masterProgress = options.progress;
    const outputStream = new ConcatStream();

    function attempt(triesRemaining = maxAttempts, startAt = 0) {
      if (startAt > 0) {
        options.headers = {
          ...options.headers,
          Range: `bytes=${startAt}-`
        };
      }

      if (masterProgress &&
          masterProgress.addChildTask) {
        options.progress = masterProgress.addChildTask({
          title: masterProgress.title
        });
      }

      try {
        return Promise.resolve(httpHelpers.request({
          outputStream,
          ...options,
        }));

      } catch (e) {
        const size = outputStream.size;
        const useTry = size === startAt;
        const change = size - startAt;

        if (!useTry || triesRemaining > 0) {
          if (useTry) {
            Console.debug(`Request failed, ${triesRemaining - 1} attempts left`);
          } else {
            Console.debug(`Request failed after ${change} bytes, retrying`);
          }

          return new Promise(
            resolve => setTimeout(resolve, retryDelaySecs * 1000)
          ).then(() => attempt(triesRemaining - (useTry ? 1 : 0), size));
        }

        Console.debug(`Request failed ${maxAttempts} times: failing`);
        return Promise.reject(new files.OfflineError(e));
      }
    }

    const result = attempt().await();
    const response = result.response
    if (response.statusCode >= 400 && response.statusCode < 600) {
      const href = response.request.href;
      throw Error(`Could not get ${href}; server returned [${response.statusCode}]`);
    }

    // Really end the stream if we got this far.
    outputStream.end(true);

    return outputStream.getBuffer();
  }
});