omardelarosa/mongo-elasticsearch

View on GitHub
lib/transfer.js

Summary

Maintainability
A
3 hrs
Test Coverage
var MongoClient = require('mongodb').MongoClient;
var elasticsearch = require('elasticsearch');

// Use Q if native promises are not available
if (typeof Promise === 'undefined') { Promise = require('q').Promise; }

function Transfer (opts){
  if (!opts || !opts.esOpts || !opts.esOpts.host || !opts.mongoUri) {
    throw new Error('Invalid opts.  esOpts and mongoUri are required');
  }
  if (!opts.esTargetIndex) {
    throw new Error('esTargetIndex is required');
  }
  if (!opts.mongoSourceCollection) {
    throw new Error('mongoSourceCollection is required');
  }
  if (!opts.esTargetType) {
    throw new Error('esTargetType is required');
  }
  this.ES = new elasticsearch.Client(opts.esOpts);
  this.esOpts = opts.esOpts;
  this.mongoUri = opts.mongoUri;
  this.mongoSourceCollection = opts.mongoSourceCollection;
  this.debug = opts.debug;
  this.esTargetIndex = opts.esTargetIndex;
  this.esTargetType = opts.esTargetType;
}

// For testing purposes only
Transfer.prototype.__setESClient = function (newClient) {
  this.ES = newClient;
};

Transfer.prototype.connectToMongo = function() {
  return new Promise(function (resolve, reject) {
    MongoClient.connect(this.mongoUri, function(err, db) {
      if (err) { return reject(err); }
      this.db = db;
      resolve(db);
    }.bind(this));
  }.bind(this));
};

Transfer.prototype.createESDocument = function (doc) {
  return new Promise(function(resolve, reject) {
    // Let ES generate IDs
    if (doc._id) { delete doc._id; }
    if (doc.id) { delete doc.id; }
    this.ES.create({
      index: this.esTargetIndex,
      type: this.esTargetType,
      body: doc
    }, function (err, res) {
      if (err) { console.log('error', err); return reject(err); }
      resolve(res);
    });
  }.bind(this));
};

Transfer.prototype.getCollection = function () {
  if (!this.db) { throw new Error('No DB connection'); }
  return this.db.collection(this.mongoSourceCollection);
};

Transfer.prototype.findBatchedDocuments = function (){ 
  return new Promise(function (resolve, reject) {
    try {
      var collection = this.getCollection(this.db);
    } catch (e) {
      reject(e);
    }
    var cursor = collection.find({});
    this.cursor = cursor;
    var timer;
    var counter = 0;
    var writePromises = [];
    var getNext = function (doc) {
      if (doc) {
        ++counter;
        try {
          var p = this.createESDocument(doc);
          writePromises.push(p);
        } catch (e) {
          reject(e);
        }
      }
      return cursor.hasNext().then(function(bool) {
        if (bool) {
          cursor.next().then(getNext);
        } else {
          Promise.all(writePromises).then(function() {
            if (this.timeout) {
              clearTimeout(timer);
            }
            resolve({ status: 'ok', documentsAdded: counter });
          });
        }
      });
    }.bind(this);
    
    if (this.timeout) {
      timer = setTimeout(
        function () {
          reject({ status: 'timeout', message: ( 'Timed out after ' + this.timeout + 'ms' ) });
        }.bind(this), 
        this.timeout
      );
    }

    getNext().catch(reject);

  }.bind(this));
};

Transfer.prototype.start = function () {
  return new Promise(function(resolve, reject){
    this.connectToMongo()
    .then(this.findBatchedDocuments.bind(this))
    .then(function(res) {
      if (this.db) { this.db.close(); }
      resolve(res);
    })
    .catch(reject);
  }.bind(this));
};

module.exports = Transfer;