mbroadst/rethunk

View on GitHub
lib/cursor.js

Summary

Maintainability
D
3 days
Test Coverage
"use strict";
var Promise = require('bluebird'),
    EventEmitter = require('events').EventEmitter,
    errors = require('./error.js'),
    helper = require('./helper.js');

var MAX_CALL_STACK = 1000;

function Cursor(connection, token, options, type) {
  this.connection = connection;
  this.token = token;

  this._stackSize = 0; // Estimation of our call stack.
  this._index = 0; // Position in this._data[0]
  this._data = []; // Array of non empty arrays
  this._fetching = false; // Are we fetching data
  this._canFetch = true; // Can we fetch more data?
  this._pendingPromises = []; // Pending promises' resolve/reject
  this.options = options || {};
  this._closed = false;
  this._closingPromise = null; // Promise returned by close
  this._type = type;
  this._setIncludesStates = false;
  this._emittedEnd = false;

  if ((type === 'feed') || (type === 'atomFeed')) {
    this.toArray = this._unsupportedToArray;
  }
}

Cursor.prototype.toString = function() {
  return '[object ' + this._type + ']';
};

Cursor.prototype.setIncludesStates = function() {
  this._setIncludesStates = true;
};

Cursor.prototype.includesStates = function() {
  return this._setIncludesStates;
};

Cursor.prototype.getType = function() {
  return this._type;
};

Cursor.prototype.toJSON = function() {
  if (this._type === 'Cursor')
    throw new errors.ReqlDriverError('You cannot serialize a Cursor to JSON. Retrieve data from the cursor with `toArray` or `next`');
  throw new errors.ReqlDriverError('You cannot serialize a ' + this._type + ' to JSON. Retrieve data from the cursor with `each` or `next`');
};

Cursor.prototype._next = function(callback) {
  var self = this;
  if (self._closed === true) {
    return Promise.reject(new errors.ReqlDriverError('You cannot call `next` on a closed ' + self._type))
      .nodeify(callback);
  }

  if ((self._data.length === 0) && (self._canFetch === false)) {
    return Promise.reject(new errors.ReqlDriverError('No more rows in the ' + self._type.toLowerCase()))
      .nodeify(callback);
  }

  if ((self._data.length > 0) && (self._data[0].length > self._index)) {
    var result = self._data[0][self._index++];
    if (result instanceof Error) {
      return Promise.reject(result).nodeify(callback);
    } else {
      // This could be possible if we get back batch with just one document?
      if (self._data[0].length === self._index) {
        self._index = 0;
        self._data.shift();
        if ((self._data.length === 1)
          && (self._canFetch === true)
          && (self._closed === false)
          && (self._fetching === false)) {
            self._fetch();
        }
      }
      return Promise.resolve(result).nodeify(callback);
    }
  }

  return new Promise(function(resolve, reject) {
    self._pendingPromises.push({resolve: resolve, reject: reject});
  }).nodeify(callback);
};

// @todo: this is potentially badly named, but is code duplication, the note
// left was: 'This could be possible if we get back batch with just one document?'
Cursor.prototype._fetchMoreIfPossible = function() {
  if (this._data[0].length === this._index) {
    this._index = 0;
    this._data.shift();
    if ((this._data.length <= 1) &&
        (this._canFetch === true) &&
        (this._closed === false) &&
        (this._fetching === false)) {
      this._fetch();
    }
  }
};

Cursor.prototype.hasNext = function() {
  throw new Error('The `hasNext` command has been removed in 1.13, please use `next`.');
};

Cursor.prototype.toArray = function(callback) {
  var self = this;
  var promise = new Promise(function(resolve, reject) {
    var result = [];
    self._each(function(err, data) {
      if (err) {
        reject(err);
      } else {
        result.push(data);
      }
    }, function() {
      resolve(result);
    });
  });

  return promise.nodeify(callback);
};

Cursor.prototype._fetch = function() {
  var self = this;
  this._fetching = true;

  var p = new Promise(function(resolve, reject) {       // jshint ignore:line
    self.connection._continue(self.token, resolve, reject);
  }).then(function(response) {
    self._push(response);
    return null;
  }).error(function(error) {
    self._fetching = false;
    self._canFetch = false;
    self._pushError(error);
  });
};

Cursor.prototype._push = function(data) {
  var couldfetch = this._canFetch;
  if (data.done) this._done();
  var response = data.response;
  this._fetching = false;

  // If the cursor was closed, we ignore all following response
  if ((response.r.length > 0) && (couldfetch === true)) {
    this._data.push(helper.makeSequence(response, this.options));
  }

  // this._fetching = false
  if ((this._closed === false) && (this._canFetch) && (this._data.length <= 1)) this._fetch();
  this._flush();
};

// Try to solve as many pending promises as possible
Cursor.prototype._flush = function() {
  while ((this._pendingPromises.length > 0) && ((this._data.length > 0) || ((this._fetching === false) && (this._canFetch === false)))) {
    var fullfiller = this._pendingPromises.shift();
    var resolve = fullfiller.resolve;
    var reject = fullfiller.reject;

    if (this._data.length <= 0) {
      return reject(new errors.ReqlDriverError('No more rows in the ' + this._type.toLowerCase()));
    }

    var result = this._data[0][this._index++];
    if (result instanceof Error) {
      return reject(result);
    }

    resolve(result);
    this._fetchMoreIfPossible();
  }
};

Cursor.prototype._pushError = function(error) {
  this._data.push([error]);
  this._flush();
};

Cursor.prototype._done = function() {
  this._canFetch = false;
  if (this._eventEmitter && (this._emittedEnd === false)) {
    this._emittedEnd = true;
    this._eventEmitter.emit('end');
  }
};

Cursor.prototype._set = function(ar) {
  this._fetching = false;
  this._canFetch = false;
  if (ar.length > 0) {
    this._data.push(ar);
  }

  this._flush();
};

Cursor.prototype.close = function(callback) {
  var self = this;
  if (self._closed === true) {
    return self._closingPromise;
  }

  self._closed = true;
  self._closingPromise = new Promise(function(resolve, reject) {
    if ((self._canFetch === false) && (self._fetching === false)) {
      resolve();
    } else {
      // since v0_4 (RethinkDB 2.0) we can (must) force a STOP request even if a CONTINUE query is pending
      var endCallback = function() {
        if (self._eventEmitter && (self._emittedEnd === false)) {
          self._emittedEnd = true;
          self._eventEmitter.emit('end');
        }
        resolve();
      };

      self.connection._end(self.token, endCallback, reject);
    }
  });

  return self._closingPromise.nodeify(callback);
};

Cursor.prototype._each = function(callback, onFinish) {
  if (this._closed === true) {
    return callback(new errors.ReqlDriverError('You cannot retrieve data from a cursor that is closed'));
  }

  var self = this;
  var reject = function(err) {
    if (err.message === 'No more rows in the ' + self._type.toLowerCase() + '.') {
      if (typeof onFinish === 'function') onFinish();
    } else {
      callback(err);
    }
  };

  var resolve = function(data) {
    self._stackSize++;
    var keepGoing = callback(null, data);
    if (keepGoing === false) {
      if (typeof onFinish === 'function') onFinish();
    } else {
      if (self._closed === false) {
        if (self._stackSize <= MAX_CALL_STACK) {
          self._next().then(resolve).error(function(error) {
            if ((error.message !== 'You cannot retrieve data from a cursor that is closed.') &&
                (error.message.match(/You cannot call `next` on a closed/) === null)) {
              reject(error);
            }
          });
        } else {
          setTimeout(function() {
            self._stackSize = 0;
            self._next().then(resolve).error(function(error) {
              if ((error.message !== 'You cannot retrieve data from a cursor that is closed.') &&
                  (error.message.match(/You cannot call `next` on a closed/) === null)) {
                reject(error);
              }
            });
          }, 0);
        }
      }
    }

    return null;
  };


  self._next().then(resolve)
    .catch(errors.ReqlBaseError, function(error) {
      if ((error.message !== 'You cannot retrieve data from a cursor that is closed.') &&
          (error.message.match(/You cannot call `next` on a closed/) === null)) {
        reject(error);
      }
  });

  return null;
};

Cursor.prototype._eachAsync = function(callback) {
  var self = this;
  return new Promise(function(resolve, reject) {
    self._eachAsyncInternal(callback, resolve, reject);
  });
};

Cursor.prototype._eachAsyncInternal = function(callback, finalResolve, finalReject) {
  if (this._closed === true) {
    finalReject(new errors.ReqlDriverError('You cannot retrieve data from a cursor that is closed'));
    return;
  }
  var self = this;

  var nextCb = function() {
    self._stackSize++;
    self._next().then(function(row) {
      if (self._stackSize <= MAX_CALL_STACK) {
        if (callback.length <= 1) {
          Promise.resolve(callback(row)).then(nextCb);
          return null;
        }
        else {
          new Promise(function(resolve, reject) {
            return callback(row, resolve);
          }).then(nextCb);
          return null;
        }
      }
      else {
        new Promise(function(resolve, reject) {
          setTimeout(function() {
            self._stackSize = 0;
            if (callback.length <= 1) {
              Promise.resolve(callback(row)).then(resolve).catch(reject);
            }
            else {
              new Promise(function(resolve, reject) {
                return callback(row, resolve);
              }).then(resolve).catch(reject);
              return null;
            }
          }, 0);
        }).then(nextCb);
        return null;
      }
    }).error(function(error) {
      if ((error.message === 'No more rows in the '+self._type.toLowerCase()+'.') ||
          (error.message === 'You cannot retrieve data from a cursor that is closed.') ||
          (error.message.match(/You cannot call `next` on a closed/) !== null)) {
        return finalResolve();
      }

      return finalReject(error);
    });
  };
  nextCb();
};

Cursor.prototype.eachAsync = Cursor.prototype._eachAsync;
Cursor.prototype.next = Cursor.prototype._next;
Cursor.prototype.each = Cursor.prototype._each;
Cursor.prototype._unsupportedToArray = function() {
  throw new Error('The `toArray` method is not available on feeds.');
};

Cursor.prototype._makeEmitter = function() {
  this.next = function() {
    throw new errors.ReqlDriverError('You cannot call `next` once you have bound listeners on the ' + this._type);
  };
  this.each = function() {
    throw new errors.ReqlDriverError('You cannot call `each` once you have bound listeners on the ' + this._type);
  };
  this.toArray = function() {
    throw new errors.ReqlDriverError('You cannot call `toArray` once you have bound listeners on the ' + this._type);
  };
  this._eventEmitter = new EventEmitter();
};

Cursor.prototype._eachCb = function(err, data) {
  // We should silent things if the cursor/feed is closed
  if (this._closed === false) {
    if (err) {
      this._eventEmitter.emit('error', err);
    } else {
      this._eventEmitter.emit('data', data);
    }
  }
};

var methods = [
  'addListener',
  'on',
  'once',
  'removeListener',
  'removeAllListeners',
  'setMaxListeners',
  'listeners',
  'emit'
];

for (var i = 0; i < methods.length; i++) {
  (function(n) {    // jshint ignore:line
    var method = methods[n];
    Cursor.prototype[method] = function() {
      var self = this;
      if (self._eventEmitter === null || self._eventEmitter === undefined) {
        self._makeEmitter();
        setImmediate(function() {
          self._each(self._eachCb.bind(self), function() {
            if (self._emittedEnd === false) {
              self._emittedEnd = true;
              self._eventEmitter.emit('end');
            }
          });
        });
      }

      var _len = arguments.length;
      var _args = new Array(_len);
      for (var _i = 0; _i < _len; _i++) {_args[_i] = arguments[_i];}
      self._eventEmitter[method].apply(self._eventEmitter, _args);
    };
  })(i);      // jshint ignore:line
}

module.exports = Cursor;