linagora/hublin

View on GitHub
backend/core/conference/index.js

Summary

Maintainability
D
1 day
Test Coverage
'use strict';

const extend = require('extend');
const q = require('q');
const async = require('async');
const mongoose = require('mongoose');
const Conference = mongoose.model('Conference');
const ConferenceArchive = mongoose.model('ConferenceArchive');
const localpubsub = require('../pubsub').local;
const globalpubsub = require('../pubsub').global;
const logger = require('../logger');
const scale = require('./scalability');

const TTL = 1000 * 60 * 10;
const GARBAGE_TTL = 1000 * 60 * 60 * 24;

const MEMBER_STATUS = {
  INVITED: 'invited',
  ONLINE: 'online',
  OFFLINE: 'offline'
};

const EVENTS = {
  join: 'conference:join',
  leave: 'conference:leave',
  invite: 'conference:invite',
  create: 'conference:create'
};

/**
 * Create a new conference in Mongo
 * @param {object} conference
 * @param {function} callback
 * @return {*}
 */
function create(conference, callback) {
  if (!conference) {
    return callback(new Error('Conference can not be null'));
  }

  scale(conference, (err, conference) => {
    if (err) {
      return callback(err, null);
    }

    return new Conference(conference).save((err, conference) => {
      if (err) {
        return callback(err, null);
      }

      localpubsub
        .topic(EVENTS.create)
        .forward(globalpubsub, {
          conference: conference,
          user: conference.members[0]
        });

      return callback(err, conference);
    });
  });
}

/**
 * Update the history property of a conference, meaning that we add an action for a user.
 * @param {string} conference
 * @param {string} user
 * @param {string} status - can be creation||join||leave
 * @param {function} callback
 * @return {*}
 */
function addHistory(conference, user, status, callback) {
  if (!user) {
    return callback(new Error('Undefined user'));
  }

  if (!conference || !conference._id) {
    return callback(new Error('Undefined or invalid conference'));
  }

  if (!status) {
    return callback(new Error('Undefined status'));
  }

  Conference.update(
    { _id: conference._id },
    {
      $push: {
        history: {
          verb: 'event',
          target: [{
            objectType: 'conference',
            _id: conference._id
          }],
          object: {
            objectType: 'event',
            _id: status
          },
          actor: {
            objectType: user.objectType || 'hublin:anonymous',
            _id: user.id,
            displayName: user.displayName || ''
          }
        }
      }
    },
    { upsert: false },
    callback
  );
}

/**
 * Get a member from a conference based on its tuple data
 *
 * @param {Conference} conference
 * @param {Tuple} tuple
 * @param {Function} callback
 * @return {*}
 */
function getMember(conference, tuple, callback) {
  Conference.findOne(
    { _id: conference._id },
    { members: { $elemMatch: { id: tuple.id, objectType: tuple.objectType } } }
  ).exec(function(err, conf) {
    if (err) {
      return callback(err);
    }

    if (!conf || !conf.members || conf.members.length === 0) {
      return callback(new Error('No such user'));
    }
    return callback(null, conf.members[0]);
  });
}

/**
 * Send invitation to one member
 *
 * @param {Conference} conference
 * @param {Member} creator
 * @param {Member} member
 * @param {string} baseUrl - The conference server URL
 * @param {Function} callback
 * @return {*}
 */
function sendInvitation(conference, creator, member, baseUrl, callback) {
  var localtopic = localpubsub.topic(EVENTS.invite);

  getMember(conference, member, function(err, m) {
    if (err) {
      return callback(err);
    }

    if (!m) {
      return callback(new Error('No such member found'));
    }

    var invitation = {
      conference: conference,
      user: m,
      creator: creator,
      baseUrl: baseUrl
    };
    localtopic.forward(globalpubsub, invitation);
    return callback();
  });
}

/**
 * Invite a list of members inside a conference
 * @param {string} conference
 * @param {string} creator - user inviting into the conference
 * @param {[member]} members - an array of members
 * @param {string} baseUrl - The conference server URL
 * @param {function} callback
 * @return {*}
 */
function invite(conference, creator, members, baseUrl, callback) {
  if (!conference) {
    return callback(new Error('Can not invite to an undefined conference'));
  }

  if (!members) {
    return callback(new Error('Can not invite undefined members'));
  }

  if (!Array.isArray(members)) {
    return callback(new Error('members parameter should be an array'));
  }

  members.forEach(function(member) {
    var confMember = {};
    extend(true, confMember, member);

    if (!confMember.displayName) {
      confMember.displayName = confMember.id;
    }
    confMember.status = MEMBER_STATUS.INVITED;
    conference.members.push(confMember);
  });

  conference.save(function(err, updated) {
    if (err) {
      return callback(err);
    }

    async.each(members, function(member, callback) {
      sendInvitation(updated, creator, member, baseUrl, function(err) {
        if (err) {
          logger.error('Error while sending invitation to', member, err);
        }
        return callback();
      });
    }, function() {
      return callback(null, updated);
    });
  });
}

/**
 * Get a conference by its id
 * @param {string} id
 * @param {function} callback
 * @return {*}
 */
function get(id, callback) {
  return Conference.findOne({ _id: id }, callback);
}

/**
 * Get conference from a member token
 *
 * @param {String} token
 * @param {Function} callback
 */
function getFromMemberToken(token, callback) {
  Conference.findOne({ 'members.token': token }).exec(callback);
}

/**
 * @param {String} token
 * @param {Function} callback
 * @return {*}
 */
function getMemberFromToken(token, callback) {
  getFromMemberToken(token, function(err, conference) {
    if (err) {
      return callback(err);
    }
    if (!conference) {
      return callback();
    }
    var member = conference.members.filter(function(member) {
      return member.token === token;
    });
    return callback(null, member[0]);
  });
}

/**
 * Update member field in a conference
 *
 * @param {Conference} conference
 * @param {Member} member
 * @param {String} field
 * @param {String} value
 * @param {Function} callback
 * @return {*}
 */
function updateMemberField(conference, member, field, value, callback) {
  var update = { displayName: { $set: { 'members.$.displayName': value } } };
  if (!update[field]) {
    return callback(new Error('Can not update the field', field));
  }
  Conference.update(
    { _id: conference._id, members: { $elemMatch: { id: member.id, objectType: member.objectType } } },
    update[field],
    { upsert: true },
    callback
  );
}

/**
 * Load a conference with its attendees
 * @param {string} id
 * @param {function} callback
 */
function loadWithAttendees(id, callback) {
  Conference.findById(id).sort('-timestamps.creation').populate('attendees.user', null, 'User').exec(callback);
}

/**
 * List all conferences by create date. It also populate creators
 * @param {function} callback
 */
function list(callback) {
  Conference.find().sort('-timestamps.creation').populate('creator', null, 'User').exec(callback);
}

/**
 * Check if user is the creator of conference
 * @param {string} conference
 * @param {string} user
 * @param {function} callback
 * @return {*}
 */
function userIsConferenceCreator(conference, user, callback) {
  if (!user) {
    return callback(new Error('Undefined user'));
  }

  if (!conference) {
    return callback(new Error('Undefined conference'));
  }
  return callback(null, conference.creator.equals(user._id));
}

/**
 * Check is user is one of the conference members
 * @param {string} conference
 * @param {string} user
 * @param {function} callback
 * @return {*}
 */
function userIsConferenceMember(conference, user, callback) {
  if (!user) {
    return callback(new Error('Undefined user'));
  }

  if (!conference) {
    return callback(new Error('Undefined conference'));
  }
  Conference.findOne({ _id: conference._id }, { members: { $elemMatch: { id: user.id, objectType: user.objectType } } }).exec(function(err, conf) {
    if (err) {
      return callback(err);
    }
    return callback(null, (conf && conf.members !== null && conf.members.length > 0));
  });
}

/**
 * Check if user can join conference.
 * Conferences are public for now so everybody can join.
 * @param {string} conference
 * @param {string} user
 * @param {function} callback
 * @return {*}
 */
function userCanJoinConference(conference, user, callback) {
  if (!user) {
    return callback(new Error('Undefined user'));
  }

  if (!conference) {
    return callback(new Error('Undefined conference'));
  }

  return callback(null, true);
}

/**
 * Add user to the conference if not already in
 *
 * @param {Conference} conference
 * @param {User} user
 * @param {Function} callback
 * @return {*}
 */
function addUser(conference, user, callback) {
  if (!user) {
    return callback(new Error('Undefined user'));
  }

  if (!conference) {
    return callback(new Error('Undefined conference'));
  }

  get(conference._id, function(err, conf) {
    if (err) {
      return callback(err);
    }

    if (!conf) {
      return callback(new Error('No such conference', conference._id));
    }

    userIsConferenceMember(conference, user, function(err, isMember) {
      if (err) {
        return callback(err);
      }

      if (isMember) {
        return callback();
      }

      user.status = user.status || MEMBER_STATUS.OFFLINE;
      user.displayName = user.displayName || user.id;

      conf.members.push(user);
      return conf.save(callback);
    });
  });
}

/**
 * Join a conference sets the member if not in list, sets member status to online and adds history
 *
 * @param {Conference} conference
 * @param {User} user
 * @param {function} callback
 * @return {*}
 */
function join(conference, user, callback) {
  if (!user) {
    return callback(new Error('Undefined user'));
  }

  if (!conference) {
    return callback(new Error('Undefined conference'));
  }

  addUser(conference, user, function(err) {
    if (err) {
      return callback(err);
    }
    Conference.update(
      { _id: conference._id, members: { $elemMatch: { id: user.id, objectType: user.objectType } } },
      { $set: { 'members.$.status': MEMBER_STATUS.ONLINE } },
      { upsert: true },
      function(err, updated) {
        if (err) {
          logger.error('Error while updating the conference', err);
          return callback(err);
        }

        localpubsub.topic(EVENTS.join).forward(globalpubsub, { conference: conference, user: user });

        return callback(null, updated);
      });
  });
}

/**
 * Leave a conference sets member status to offline and adds history
 *
 * @param {Conference} conference
 * @param {Tuple} user
 * @param {function} callback
 * @return {*}
 */
function leave(conference, user, callback) {
  if (!user) {
    return callback(new Error('Undefined user'));
  }

  if (!conference) {
    return callback(new Error('Undefined conference'));
  }

  userIsConferenceMember(conference, user, function(err, isMember) {
    if (err) {
      return callback(err);
    }

    if (!isMember) {
      return callback(new Error('User is not member of this conference'));
    }
    Conference.update(
      { _id: conference._id, members: { $elemMatch: { id: user.id, objectType: user.objectType } } },
      { $set: { 'members.$.status': MEMBER_STATUS.OFFLINE } },
      { upsert: true },
      function(err, updated) {
        if (err) {
          return callback(err);
        }

        localpubsub.topic(EVENTS.leave).forward(globalpubsub, { conference: conference, user: user });

        return callback(null, updated);
      });
  });
}

/**
 * Hook for room join event called from linagora.esn.webrtc module
 *
 * @param {String} roomId
 * @param {String} userId
 * @param {Function} callback
 * @return {*}
 */
function onRoomJoin(roomId, userId, callback) {
  get(roomId, function(err, conference) {
    if (err) {
      logger.error('Error while getting room', err);
      return callback(new Error('Error while getting conference from room'));
    }

    if (!conference) {
      logger.info('Can not find conference from room', roomId);
      return callback(new Error('Can not find conference from room', roomId));
    }

    var user = conference.members.id(userId);
    if (!user) {
      logger.info('No valid user found for room %s with id %s', roomId, userId);
      return callback();
    }

    return join(conference, user, callback);
  });
}

/**
 * Hook for room leave event called from linagora.esn.webrtc module
 *
 * @param {String} roomId
 * @param {String} userId
 * @param {Function} callback
 * @return {*}
 */
function onRoomLeave(roomId, userId, callback) {
  get(roomId, function(err, conference) {
    if (err) {
      logger.error('Error while getting room %s', roomId, err);
      return callback(new Error('Error while getting conference from room', roomId));
    }

    if (!conference) {
      logger.info('Can not find conference from room %s', roomId);
      return callback(new Error('Can not find conference from room', roomId));
    }

    var user = conference.members.id(userId);
    if (!user) {
      logger.info('No valid user found for room %s with id %s', roomId, userId);
      return callback();
    }

    return leave(conference, user, callback);
  });
}

/**
 * get garbage time to live of a conference
 *
 * @return {Promise} fullfilled with the conference GARBAGE_TTL
 */
function getGarbageTTL() {
  return q(GARBAGE_TTL);
}

/**
 * get time to live of a conference
 *
 * @return {Promise} fullfilled with the conference TTL
 */
function getTTL() {
  return q(TTL);
}

/**
 * tell if a conference is active.
 *
 * A conference can be not active for two reasons:
 * - when no member is in state MEMBER_STATUS.ONLINE , and the conference is older than ttl.
 * - when the last history entry is older than garbage ttl
 *
 * @param {Conference} conference to check for active state
 * @return {Promise} fullfilled with a boolean telling whether the conference is still active
 */
function isActive(conference) {

  function _isActive(ttls) {
    var ttl = ttls[0],
      garbageTTL = ttls[1];
    function memberOnline(member) {
      return member.status === MEMBER_STATUS.ONLINE;
    }
    function ttlNotExpired(creationDate) {
      var diff = Date.now() - creationDate.getTime() - ttl;
      return diff < 0;
    }
    function garbageTtlNotReached(conference) {
      if (!conference.history || !conference.history.length) {
        return true;
      }
      var latestActivity = conference.history[(conference.history.length - 1)];
      var diff = Date.now() - latestActivity.published.getTime() - garbageTTL;
      return diff < 0;
    }

    var active = garbageTtlNotReached(conference) &&
      (conference.members.some(memberOnline) || ttlNotExpired(conference.timestamps.created));
    return q(active);
  }

  return q.all([getTTL(), getGarbageTTL()])
    .then(_isActive);
}

function _buildConferenceArchive(conference) {
  var initial_id = conference._id;
  var jsonConference = conference.toObject();
  delete jsonConference._id;
  jsonConference.initial_id = initial_id;
  jsonConference.timestamps.archived = new Date();

  return new ConferenceArchive(jsonConference);
}

/**
 * archive a conference
 *
 * this create a ConferenceArchive, and removes the
 * conference.
 *
 * @param {Conference} conference to remove
 * @return {Promise} fullfilled with the conferenceArchive mongoose document
 */
function archive(conference) {
  var deferred = q.defer();
  var ca = _buildConferenceArchive(conference);

  ca.save(function(err, conferenceArchive) {
    if (err) {
      logger.error('Unable to save conference archive %s', ca.toObject(), err);
      return deferred.reject(err);
    }
    Conference.remove({ _id: conferenceArchive.initial_id }, function(err) {
      if (err) {
        logger.error('Unable to remove inactive conference', err);
        return deferred.reject(err);
      }
      logger.debug('conference ' + conferenceArchive.initial_id + ' has been archived, archive _id=' + conferenceArchive._id);
      return deferred.resolve(conferenceArchive);
    });
  });
  return deferred.promise;
}

/**
 * @type {create}
 */
module.exports.create = create;
/**
 * @type {addHistory}
 */
module.exports.addHistory = addHistory;
/**
 * @type {invite}
 */
module.exports.invite = invite;

/**
 * @type {sendInvitation}
 */
module.exports.sendInvitation = sendInvitation;

/**
 * @type {get}
 */
module.exports.get = get;

/**
 * @type {getFromMemberToken}
 */
module.exports.getFromMemberToken = getFromMemberToken;

/**
 * @type {getMemberFromToken}
 */
module.exports.getMemberFromToken = getMemberFromToken;

/**
 * @type {loadWithAttendees}
 */
module.exports.loadWithAttendees = loadWithAttendees;
/**
 * @type {list}
 */
module.exports.list = list;
/**
 * @type {userIsConferenceCreator}
 */
module.exports.userIsConferenceCreator = userIsConferenceCreator;
/**
 * @type {function}
 */
module.exports.userIsConferenceMember = userIsConferenceMember;
/**
 * @type {userCanJoinConference}
 */
module.exports.userCanJoinConference = userCanJoinConference;
/**
 * @type {join}
 */
module.exports.join = join;
/**
 *
 * @type {onRoomJoin}
 */
module.exports.onRoomJoin = onRoomJoin;
/**
 *
 * @type {onRoomLeave}
 */
module.exports.onRoomLeave = onRoomLeave;
/**
 * @type {leave}
 */
module.exports.leave = leave;

/**
 * @type {getMember}
 */
module.exports.getMember = getMember;

/**
 * @type {addUser}
 */
module.exports.addUser = addUser;

/**
 * @type {updateMemberField}
 */
module.exports.updateMemberField = updateMemberField;

/**
 * @type {{join: string, leave: string}}
 */
module.exports.EVENTS = EVENTS;

/**
 * @type {hash} The possible statuses for conference members
 */
module.exports.MEMBER_STATUS = MEMBER_STATUS;

/**
 * @type {Function}
 */
module.exports.isActive = isActive;

/**
 * @type {Function}
 */
module.exports.archive = archive;

/**
 * @type {Function}
 */
module.exports.getTTL = getTTL;