syntheticore/declaire

View on GitHub
src/query.js

Summary

Maintainability
C
1 day
Test Coverage
var _ = require('./utils.js');


// Lazy filter over a collection or database collection
// The filter remains active and will emit change events when its results set changes
var Query = function(modelOrCollection, query, options) {
  query = query || {};
  options = options || {};

  var allCache;

  var getItems = function(onlyOne, cb, remoteFinished) {
    if(modelOrCollection.klass == 'Model') {
      var local = false;
      var localItems = modelOrCollection.dataInterface.all(_.merge(options, {query: query}), function(err, items) {
        // allCache = items;
        if(!local) {
          resolveInstances(items).then(function() {
            cb && cb(filter(items, onlyOne));
          });
        }
        remoteFinished && remoteFinished();
      });
      if(localItems && localItems.length) {
        local = true;
        resolveInstances(localItems).then(function() {
          cb && cb(filter(localItems, onlyOne));
        });
      }
      // return filter(localItems, onlyOne);
    } else {
      cb && cb(filter(modelOrCollection.items, onlyOne));
    }
  };

  var filter = function(items, onlyOne) {
    return _.select(items, function(item) {
      return true;
    }, onlyOne ? 1 : options.limit);
  };

  var resolveInstances = function(instances) {
    return _.resolvePromises(_.map(instances, function(inst) {
      return inst.resolve();
    }));
  };

  var subscribed = false;

  var subscribe = function() {
    if(modelOrCollection.klass == 'Model') {
      if(_.onClient()) {
        console.log("subscribe " + modelOrCollection.name);
        modelOrCollection.app.pubSub.subscribe('create update delete', modelOrCollection.name, function(data) {
          console.log("Updating query due to pubsub");
          getItems(false, null, function() {
            inst.emit('change:' + 'size');
            inst.emit('change');
          });
        });
        modelOrCollection.on('create', function() {
          inst.emit('change:' + 'size');
          inst.emit('change');
        });
        subscribed = true;
      }
    } else if(modelOrCollection.klass == 'Collection') {
      modelOrCollection.on('change:size', function() {
        allCache = null;
        inst.emit('change:' + 'size');
        inst.emit('change');
      });
      subscribed = true;
    } else {
      console.error('Queries work with models and collections only');
    }
  };

  var unsubscribe = function() {
    console.log("unsubscribe");
    subscribed = false;
  };

  var inst = {
    klass: 'Query',
    query: query,

    // Return actual results for this query,
    // from cache or from the data interface
    resolve: function(cb) {
      var self = this;
      if(allCache) {
        cb(allCache);
      } else {
        // var localResolve = false;
        // var localItems = getItems(false, function(items) {
        //   if(!localResolve) cb(items);
        // });
        // if(localItems && localItems.length) {
        //   localResolve = true;
        //   cb(localItems);
        // }
        getItems(false, cb);
      }
      return self;
    },

    // Return only the first match
    first: function(cb) {
      var self = this;
      if(allCache && allCache[0]) {
        cb(allCache[0]);
      } else {
        getItems(true, function(items) { //XXX return local items
          cb(items[0]);
        });
      }
      return self;
    },

    // Return another Query with the given filter applied
    filter: function(moreQuery) {
      return Query(modelOrCollection, _.deepMerge(query, moreQuery), options);
    },

    // Return another Query that has its limit set
    limit: function(limit) {
      return Query(modelOrCollection, query, _.merge(options, {limit: limit}));
    },

    // Return another query with a sort constraint set 
    sortBy: function(fieldOrFunc) {
      return this.clone();
    },

    // Call the given method on all result objects
    // Will resolve this collection
    invoke: function(method) {
      var args = Array.prototype.slice.call(arguments).splice(1);
      this.resolve(function(items) {
        _.invoke.apply(null, _.union([items, method], args));
      });
      return this;
    },

    clone: function() {
      return Query(modelOrCollection, query, options);
    },

    // Return a promise that resolves to the result set's length
    size: function() {
      var self = this;
      return _.promise(function(ok, fail) {
        self.resolve(function(items) {
          ok(items.length);
        });
      });
    }
  };

  _.eventHandling(inst);

  // Dynamically subscribe and unsubscribe when listeners are added and removed
  inst.on('listenerAdded', function() {
    if(!subscribed) {
      subscribe();
    }
  });

  inst.on('listenerRemoved', function() {
    if(subscribed && inst.listeners.length == 0) {
      // Defer unsubscribtion to allow immediately readding
      // handlers afer dropping to zero
      _.defer(function() {
        if(subscribed && inst.listeners.length == 0) {
          unsubscribe();
          allCache = null;
        }
      }, 1000);
    }
  });

  return inst;
};


module.exports = Query;