wbyoung/yahoo-finance-stream

View on GitHub
index.js

Summary

Maintainability
A
25 mins
Test Coverage
'use strict';

var _ = require('lodash');
var util = require('util');
var qs = require('querystring');
var request = require('request');
var JSONStream = require('JSONStream');
var Readable = require('stream').Readable;

function Stream(options) {
  if (!(this instanceof Stream)) {
    return new Stream(options);
  }

  Readable.call(this, { highWaterMark: 1, objectMode : true });

  this._request = null;
  this._symbols = [];
  this._options = _.defaults({}, options, {
    frequency: 60000,
    endpoint: 'https://query.yahooapis.com/v1/public/yql',
  });
}

util.inherits(Stream, Readable);

Stream.prototype.watch = function(symbol) {
  this._symbols.push(symbol.toUpperCase());
  this._schedule({ immediate: true });
};

Stream.prototype._url = function() {
  var symbols = this._symbols.map(function(s) {
    return JSON.stringify(s);
  });
  var yql = util.format(
    'select * from yahoo.finance.quotes ' +
    'where symbol in (%s)', symbols);
  var query = qs.stringify({
    q: yql,
    format: 'json',
    env: 'store://datatables.org/alltableswithkeys',
    callback: '',
  });
  return util.format('%s?%s', this._options.endpoint, query);
};

Stream.prototype._standardize = function(quote) {
  var standardized = { _quote: quote };
  return _.reduce(quote, function(obj, value, key) {
    if (/^[+-]?[\d\.]+$/.test(value)) { value = parseFloat(value); }
    else if (/^[+-]?[\d\.]+%$/.test(value)) { value = parseFloat(value) / 100.0; }
    obj[_.camelCase(key)] = value;
    return obj;
  }, standardized);
};

Stream.prototype._canRun = function() {
  return this._request === null && // only allow one request
    this._symbols.length !== 0; // only run when symbols are present
};

Stream.prototype._run = function() {
  if (!this._canRun()) {
    throw new Error('Cannot run when _canRun returns false.');
  }

  var self = this;
  var emitError = this._error.bind(this);
  var jsonPath = this._symbols.length === 1 ?
    'query.results.quote' :
    'query.results.quote.*';
  var stream = this._request = request({
    url: this._url(),
  })
  .on('error', emitError)
  .pipe(JSONStream.parse(jsonPath))
  .on('error', emitError);

  stream.on('data', function(quote) {
    if (!self.push(self._standardize(quote))) {
      self._running = false;
    }
  });

  stream.on('end', function() {
    self._request = null;
    self._schedule();
  });
};


Stream.prototype._schedule = function(options) {
  var opts = _.defaults({}, options);

  // if immediate flag is given or was previously given, we ensure that it
  // sticks until the next successful run occurs. we do this by storing an
  // instance variable to track it across scheduling requests. either the
  // option flag or the instance variable being set indicates that both should
  // be set.
  if (this._immediate || opts.immediate) {
    this._immediate = opts.immediate = true;
  }

  // remove any existing timer
  clearTimeout(this._timer);

  var self = this;
  var timeout = opts.immediate ? 0 : this._options.frequency;
  var run = function() {
    if (self._canRun()) {
      self._run();
      self._immediate = false; // run started, clear immediate flag
    }
  };

  // schedule the timer only if we're actually running
  if (this._running) {
    this._timer = setTimeout(run, timeout);
  }
};

Stream.prototype._error = function(e) {
  this.emit('error', e);
  this.close();
};

Stream.prototype._read = function(/*size*/) {
  // mark this as running & schedule a timer to actually run. if this wasn't
  // already running, then we schedule the run for immediate execution.
  var wasRunning = this._running;
  this._running = true;
  this._schedule({ immediate: !wasRunning });
};

Stream.prototype.close = function() {
  var self = this;
  var close = function() {
    self._running = false;
    self._schedule();
    self.push(null);
  };

  if (this._request) { this._request.on('end', close); }
  else { close(); }
};

module.exports = Stream;