meteor/meteor

View on GitHub
packages/socket-stream-client/node.js

Summary

Maintainability
A
3 hrs
Test Coverage
import { Meteor } from "meteor/meteor";
import { toWebsocketUrl } from "./urls.js";
import { StreamClientCommon } from "./common.js";

// @param endpoint {String} URL to Meteor app
//   "http://subdomain.meteor.com/" or "/" or
//   "ddp+sockjs://foo-**.meteor.com/sockjs"
//
// We do some rewriting of the URL to eventually make it "ws://" or "wss://",
// whatever was passed in.  At the very least, what Meteor.absoluteUrl() returns
// us should work.
//
// We don't do any heartbeating. (The logic that did this in sockjs was removed,
// because it used a built-in sockjs mechanism. We could do it with WebSocket
// ping frames or with DDP-level messages.)
export class ClientStream extends StreamClientCommon {
  constructor(endpoint, options) {
    super(options);

    this.client = null; // created in _launchConnection
    this.endpoint = endpoint;

    this.headers = this.options.headers || {};
    this.npmFayeOptions = this.options.npmFayeOptions || {};

    this._initCommon(this.options);

    //// Kickoff!
    this._launchConnection();
  }

  // data is a utf8 string. Data sent while not connected is dropped on
  // the floor, and it is up the user of this API to retransmit lost
  // messages on 'reset'
  send(data) {
    if (this.currentStatus.connected) {
      this.client.send(data);
    }
  }

  // Changes where this connection points
  _changeUrl(url) {
    this.endpoint = url;
  }

  _onConnect(client) {
    if (client !== this.client) {
      // This connection is not from the last call to _launchConnection.
      // But _launchConnection calls _cleanup which closes previous connections.
      // It's our belief that this stifles future 'open' events, but maybe
      // we are wrong?
      throw new Error('Got open from inactive client ' + !!this.client);
    }

    if (this._forcedToDisconnect) {
      // We were asked to disconnect between trying to open the connection and
      // actually opening it. Let's just pretend this never happened.
      this.client.close();
      this.client = null;
      return;
    }

    if (this.currentStatus.connected) {
      // We already have a connection. It must have been the case that we
      // started two parallel connection attempts (because we wanted to
      // 'reconnect now' on a hanging connection and we had no way to cancel the
      // connection attempt.) But this shouldn't happen (similarly to the client
      // !== this.client check above).
      throw new Error('Two parallel connections?');
    }

    this._clearConnectionTimer();

    // update status
    this.currentStatus.status = 'connected';
    this.currentStatus.connected = true;
    this.currentStatus.retryCount = 0;
    this.statusChanged();

    // fire resets. This must come after status change so that clients
    // can call send from within a reset callback.
    this.forEachCallback('reset', callback => {
      callback();
    });
  }

  _cleanup(maybeError) {
    this._clearConnectionTimer();
    if (this.client) {
      var client = this.client;
      this.client = null;
      client.close();

      this.forEachCallback('disconnect', callback => {
        callback(maybeError);
      });
    }
  }

  _clearConnectionTimer() {
    if (this.connectionTimer) {
      clearTimeout(this.connectionTimer);
      this.connectionTimer = null;
    }
  }

  _getProxyUrl(targetUrl) {
    // Similar to code in tools/http-helpers.js.
    var proxy = process.env.HTTP_PROXY || process.env.http_proxy || null;
    var noproxy = process.env.NO_PROXY || process.env.no_proxy || null;
    // if we're going to a secure url, try the https_proxy env variable first.
    if (targetUrl.match(/^wss:/) || targetUrl.match(/^https:/)) {
      proxy = process.env.HTTPS_PROXY || process.env.https_proxy || proxy;
    }
    if (targetUrl.indexOf('localhost') != -1 || targetUrl.indexOf('127.0.0.1') != -1) {
      return null;
    }
    if (noproxy) {
      for (let item of noproxy.split(',')) {
        if (targetUrl.indexOf(item.trim().replace(/\*/, '')) !== -1) {
          proxy = null;
        }
      }
    }
    return proxy;
  }

  _launchConnection() {
    this._cleanup(); // cleanup the old socket, if there was one.

    // Since server-to-server DDP is still an experimental feature, we only
    // require the module if we actually create a server-to-server
    // connection.
    var FayeWebSocket = Npm.require('faye-websocket');
    var deflate = Npm.require('permessage-deflate');

    var targetUrl = toWebsocketUrl(this.endpoint);
    var fayeOptions = {
      headers: this.headers,
      extensions: [deflate]
    };
    fayeOptions = Object.assign(fayeOptions, this.npmFayeOptions);
    var proxyUrl = this._getProxyUrl(targetUrl);
    if (proxyUrl) {
      fayeOptions.proxy = { origin: proxyUrl };
    }

    // We would like to specify 'ddp' as the subprotocol here. The npm module we
    // used to use as a client would fail the handshake if we ask for a
    // subprotocol and the server doesn't send one back (and sockjs doesn't).
    // Faye doesn't have that behavior; it's unclear from reading RFC 6455 if
    // Faye is erroneous or not.  So for now, we don't specify protocols.
    var subprotocols = [];

    var client = (this.client = new FayeWebSocket.Client(
      targetUrl,
      subprotocols,
      fayeOptions
    ));

    this._clearConnectionTimer();
    this.connectionTimer = Meteor.setTimeout(() => {
      this._lostConnection(new this.ConnectionError('DDP connection timed out'));
    }, this.CONNECT_TIMEOUT);

    this.client.on(
      'open',
      Meteor.bindEnvironment(() => {
        return this._onConnect(client);
      }, 'stream connect callback')
    );

    var clientOnIfCurrent = (event, description, callback) => {
      this.client.on(
        event,
        Meteor.bindEnvironment((...args) => {
          // Ignore events from any connection we've already cleaned up.
          if (client !== this.client) return;
          callback(...args);
        }, description)
      );
    };

    clientOnIfCurrent('error', 'stream error callback', error => {
      if (!this.options._dontPrintErrors)
        Meteor._debug('stream error', error.message);

      // Faye's 'error' object is not a JS error (and among other things,
      // doesn't stringify well). Convert it to one.
      this._lostConnection(new this.ConnectionError(error.message));
    });

    clientOnIfCurrent('close', 'stream close callback', () => {
      this._lostConnection();
    });

    clientOnIfCurrent('message', 'stream message callback', message => {
      // Ignore binary frames, where message.data is a Buffer
      if (typeof message.data !== 'string') return;

      this.forEachCallback('message', callback => {
        callback(message.data);
      });
    });
  }
}