ysugimoto/node-memcached-client

View on GitHub
libs/memcached.js

Summary

Maintainability
F
3 days
Test Coverage
'use strict';

const EventEmitter = require('events').EventEmitter;
const Pool = require('./pool.js');
const Message = require('./message.js');
const Logger = require('./log.js');

const DEFAULT_OPTIONS = {
  host: 'localhost',
  port: 11211,
  autoreconnect: false,
  commandTimeout: 2000,
  reconnectDuration: 2000,
  maxRetryConnectCount: 10
};

/**
 * Memcached client
 *
 * Specs:
 *   - Manage connection pools
 *   - Support automatic reconnection
 *   - Command queueing, and run when reconnected
 *   - Non-exiting when connection closed unexpectedly
 *
 * Protocol specification
 * @see https://github.com/memcached/memcached/blob/master/doc/protocol.txt
 */
class Memcached extends EventEmitter {

  /**
   v Constructor
   * @param {Object} options - Connect options
   */
  constructor(options) {
    super();

    this.queue = [];
    this.conn = null;
    this.running = false;
    this.clientClosed = false;

    this.options = Object.assign(DEFAULT_OPTIONS, options || {});
  }

  /**
   * Validate key signature
   *
   * @param {String} key key name
   * @return {Void} -
   * @throws Error
   */
  validateKey(key) {
    if (key.length > 250) {
      throw new Error('Key size limit exceeded. Key size must be under the 250 bytes');
    }
    // Key string must not contain control characters and whitespece
    if (/[\u0000-\u001F|\u007f|\u0080-\u009f|\s]/.test(key)) {
      throw new Error('Invalid key. Key must not contain control characters and whitespace');
    }
  }

  /**
   * Connect to memcached server
   *
   * @return {Void} -
   */
  connect() {
    return new Promise(resolve => {
      const c = Pool.pull(this.options.host, this.options.port, this.options);
      this.conn = c.connection;

      if (c.created) {
        this.conn.on('mc.error', () => this.emit('mc.error'));
        this.conn.on('timeout', () => this.emit('timeout'));
        this.conn.on('connect', () => {
          this.emit('connect');
          resolve(this);
        });
        this.conn.on('close', () => this.emit('close'));
      } else {
        // We need to a bit delay to emit connect event if connection returns by pool
        setTimeout(() => {
          this.emit('connect');
          resolve(this);
        }, 10);
      }
    });
  }

  /**
   * Close connection from client
   *
   * @return {Void} -
   */
  close() {
    try {
      // Not that this closing connect is expected by user.
      // So we do not try to reconnect
      this.conn.close();
    } catch (e) {
      Logger.error(e);
    }
  }

  /**
   * Send "add" command
   *
   * @param {String} key cache key
   * @param {String|Buffer} value store value
   * @param {Booelan} isCompress flag of data should be compressed
   * @param {Number} expires time to cache has expired (if supplied zero, the value is persitent)
   * @return {Promise} -
   */
  add(key, value, isCompress = 0, expires = 0) {
    return this.store('add', key, value, isCompress, expires);
  }

  /**
   * Send "replace" command
   *
   * @param {String} key cache key
   * @param {String|Buffer} value store value
   * @param {Booelan} isCompress flag of data should be compressed
   * @param {Number} expires time to cache has expired (if supplied zero, the value is persitent)
   * @return {Promise} -
   */
  replace(key, value, isCompress = 0, expires = 0) {
    return this.store('replace', key, value, isCompress, expires);
  }

  /**
   * Send "append" command
   *
   * @param {String} key cache key
   * @param {String|Buffer} value store value
   * @param {Booelan} isCompress flag of data should be compressed
   * @param {Number} expires time to cache has expired (if supplied zero, the value is persitent)
   * @return {Promise} -
   */
  append(key, value, isCompress = 0, expires = 0) {
    return this.store('append', key, value, isCompress, expires);
  }

  /**
   * Send "prepend" command
   *
   * @param {String} key cache key
   * @param {String|Buffer} value store value
   * @param {Booelan} isCompress flag of data should be compressed
   * @param {Number} expires time to cache has expired (if supplied zero, the value is persitent)
   * @return {Promise} -
   */
  prepend(key, value, isCompress = 0, expires = 0) {
    return this.store('prepend', key, value, isCompress, expires);
  }

  /**
   * Send "set" command
   *
   * @param {String} key cache key
   * @param {String|Buffer} value store value
   * @param {Booelan} isCompress flag of data should be compressed
   * @param {Number} expires time to cache has expired (if supplied zero, the value is persitent)
   * @return {Promise} -
   */
  set(key, value, isCompress = 0, expires = 0) {
    return this.store('set', key, value, isCompress, expires);
  }

  /**
   * Create stored command and send, and handle reply
   *
   * @param {String} commandName command name
   * @param {String} key cache key
   * @param {String|Buffer} value store value
   * @param {Booelan} isCompress flag of data should be compressed
   * @param {Number} expires time to cache has expired (if supplied zero, the value is persitent)
   * @return {Promise} -
   * @resolve {Void}
   * @reject {Void}
   */
  store(commandName, key, value, isCompress, expires) {
    this.validateKey(key);
    const stringValue = value.toString();
    const byteSize = (value instanceof Buffer) ? value.byteLength : Buffer.byteLength(stringValue, 'utf8');
    const command = [
      `${commandName} ${key} ${isCompress ? 1 : 0} ${expires} ${byteSize}`,
      (value instanceof Buffer) ? value.toString('utf8') : stringValue
    ];
    return this.conn.command(command)
      .then(message => {
        const code = message.code;
        switch (code) {
          case Message.EXISTS:
          case Message.STORED:
          case Message.NOT_STORED:
            return Promise.resolve(code);
          default:
            return Promise.reject(code);
        }
      })
    ;
  }

  /**
   * Send "cas" command
   *
   * @param {String} key cache key
   * @param {String|Buffer} value store value
   * @param {Booelan} isCompress flag of data should be compressed
   * @param {Number} expires time to cache has expired (if supplied zero, the value is persitent)
   * @param {String} unique cas unique
   * @return {Promise} -
   * @resolve {String} reply code
   * @reject {String} reply code
   */
  cas(key, value, isCompress = 0, expires = 0, unique = '') {
    this.validateKey(key);
    const byteSize = (value instanceof Buffer) ? Buffer.length : Buffer.byteLength(value, 'utf8');
    const command = [
      `cas ${key} ${isCompress ? 1 : 0} ${expires} ${byteSize} ${unique}`,
      (value instanceof Buffer) ? value.toString('utf8') : value
    ];
    return this.conn.command(command)
      .then(message => {
        const code = message.code;
        switch (code) {
          case Message.STORED:
            return Promise.resolve(code);
          default:
            return Promise.reject(code);
        }
      })
    ;
  }

  /**
   * Get cache data from supplied key
   *
   * @param {Array} keys cache key
   * @return {Promise} -
   * @resolve {String|Object|null} cache data
   *   If single key was supplied, return string. Otherwise return key mapped Object
   * @reject {Void}
   */
  get(...keys) {
    keys.forEach(k => this.validateKey(k));
    return this.conn.command([`get ${keys.join(' ')}`])
      .then(message => {
        const code = message.code;
        switch (code) {
          case Message.END:
            return Promise.resolve(null);
          case Message.ERROR:
          case Message.SERVER_ERROR:
          case Message.CLIENT_ERROR:
            return Promise.reject(code);
          default:
            if (keys.length === 1) {
              return Promise.resolve(message.getValue());
            }
            const values = message.getBulkValues();
            keys.forEach(k => {
              if (!values.hasOwnProperty(k)) {
                values[k] = null;
              }
            });
            return Promise.resolve(values);
        }
      })
    ;
  }

  /**
   * Get cache data from supplied key with cas unique
   *
   * @param {...String} key cache key list
   * @return {Promise} -
   * @resolve {Object|null} cache data map by key
   * @reject {Void}
   */
  gets(...keys) {
    keys.forEach(k => this.validateKey(k));
    return this.conn.command([`gets ${keys.join(' ')}`])
      .then(message => {
        const code = message.code;
        switch (code) {
          case Message.END:
            return Promise.resolve(null);
          case Message.ERROR:
          case Message.SERVER_ERROR:
          case Message.CLIENT_ERROR:
            return Promise.reject(code);
          default:
            if (keys.length === 1) {
              return Promise.resolve(message.getObjectValue());
            }
            const values = message.getBulkObjectValues();
            keys.forEach(k => {
              if (!values.hasOwnProperty(k)) {
                values[k] = null;
              }
            });
            return Promise.resolve(values);
        }
      })
    ;
  }

  /**
   * Delete cache data from supplied key
   *
   * @param {String} key cache key
   * @return {Promise} -
   * @resolve {Void}
   * @reject {Void}
   */
  delete(key) {
    this.validateKey(key);
    return this.conn.command([`delete ${key}`])
      .then(message => {
        const code = message.code;
        switch (code) {
          case Message.DELETED:
            return Promise.resolve(code);
          default:
            return Promise.reject(code);
        }
      })
    ;
  }

  /**
   * Increment data from supplied key
   *
   * @param {String} key cache key
   * @param {Number} value increment value
   * @return {Promise} -
   * @resolve {Number} Incremented value
   * @reject {Void}
   */
  incr(key, value = 1) {
    return this.incrOrDecr('incr', key, value);
  }

  /**
   * Decrement data from supplied key
   *
   * @param {String} key cache key
   * @param {Number} value decrement value
   * @return {Promise} -
   * @resolve {Number} Decremented value
   * @reject {Void}
   */
  decr(key, value = 1) {
    return this.incrOrDecr('decr', key, value);
  }

  /**
   * Send "incr" or "decr" command
   *
   * @param {String} commandName command name
   * @param {String} key cache key
   * @param {Number} value increment or decrement value
   * @return {Promise} -
   * @resolve {Number}
   * @reject {Void}
   */
  incrOrDecr(commandName, key, value) {
    this.validateKey(key);
    const command = [`${commandName} ${key} ${value}`];
    return this.conn.command(command)
      .then(message => {
        const result = parseInt(message.rawData, 10);
        if (!isNaN(result)) {
          return Promise.resolve(result);
        }
        return Promise.reject(message.code);
      })
    ;
  }

  /**
   * Update expired time for key
   *
   * @param {String} key cache key
   * @param {Number} expires update expired time
   * @return {Promise} -
   * @resolve {Void}
   * @reject {Void}
   */
  touch(key, expires) {
    this.validateKey(key);
    const command = [`touch ${key} ${expires}`];
    return this.conn.command(command)
      .then(message => {
        const code = message.code;
        switch (code) {
          case Message.TOUCHED:
            return Promise.resolve(code);
          default:
            return Promise.reject(code);
        }
      })
    ;
  }
}

module.exports = Memcached;