azukiapp/azk

View on GitHub
src/agent/balancer.js

Summary

Maintainability
B
5 hrs
Test Coverage
import { _, t, path, fsAsync, config, log } from 'azk';
import { async, defer, promisifyAll, thenAll, promiseResolve } from 'azk/utils/promises';

import { lazy_require } from 'azk';
import { net } from 'azk/utils';
import { Tools } from 'azk/agent/tools';
import { AgentStartError } from 'azk/utils/errors';

var lazy = lazy_require({
  forever        : 'forever-monitor',
  Manifest       : ['azk/manifest'],
  Client         : ['azk/agent/client'],
  MemoryStream   : 'memorystream',
  MemcachedDriver: 'memcached',
});

// TODO: Replace forever for a better solution :/
var Balancer = {
  memcached : null,
  hipache   : null,
  mem_client: null,

  running: {
    dns: false,
    'balancer-redirect': false,
  },

  // Hipache database controll
  get memCached() {
    if (!this.mem_client) {
      var socket = config('paths:memcached_socket');
      this.mem_client = new lazy.MemcachedDriver(socket);
      this.mem_client = promisifyAll(this.mem_client);
    }
    return this.mem_client;
  },

  removeAll(host) {
    var key = 'frontend:' + host;
    return this.memCached.deleteAsync(key);
  },

  getBackends(host) {
    var key = 'frontend:' + host;
    return this.memCached.getAsync(key).then((entries) => {
      return entries ? entries : [host];
    });
  },

  addBackend(hosts, backend) {
    return async(this, function* () {
      for (var host of (_.isArray(hosts) ? hosts : [hosts])) {
        var key = 'frontend:' + host;
        var entries = yield this.getBackends(host);
        entries = this._removeEntry(entries, backend);
        entries.push(backend);
        yield this.memCached.setAsync(key, entries, 0);
      }
    });
  },

  removeBackend(hosts, backend) {
    return async(this, function* () {
      for (var host of (_.isArray(hosts) ? hosts : [hosts])) {
        var key = 'frontend:' + host;
        var entries = yield this.getBackends(host);
        entries = this._removeEntry(entries, backend);
        yield this.memCached.setAsync(key, entries, 0);
      }
    });
  },

  // Balancer service and subsystems controll
  start(vm_enabled = true) {
    return Tools.async_status("balancer", this, function* () {
      if (!this.isRunnig()) {
        var socket = config('paths:memcached_socket');
        var ip     = net.calculateGatewayIp(config("agent:vm:ip"));
        var port   = yield net.getPort();

        if (vm_enabled) {
          // Subsistems : dns and balancer redirect
          yield this.start_dns(ip, port);
          yield this.start_redirect(ip, port);
        }

        // Memcached and Hipache
        yield this.start_memcached(socket);
        yield this.start_hipache(vm_enabled ? ip : null, port, socket);
      }
    });
  },

  start_dns() {
    return this._run_system('dns', {
      wait: false,
    });
  },

  start_redirect(ip, port) {
    return this._run_system('balancer-redirect', {
      wait: true,
      envs: {
        BALANCER_IP: ip,
        BALANCER_PORT: port,
      }
    });
  },

  start_hipache(ip, port, socket) {
    return async(this, function* () {
      var pid  = config("paths:hipache_pid");
      var file = yield this._check_config(ip, port, socket);
      var cmd  = [ 'nvm', 'hipache', '--config', file ];
      var name = "hipache";
      var child = yield this._start_service(name, cmd, pid);
      this.hipache = child;
      log.info("hipache started in %s port with file config", port, file);
      this._handleChild(name, child);
      return promiseResolve({});
    });
  },

  start_memcached(socket) {
    return async(this, function* () {
      var pid  = config("paths:memcached_pid");
      var cmd  = [ 'nvm', 'memcachedjs', '--socket', socket ];
      var name = "memcached";

      // Remove socket before start
      var socket_exists = yield fsAsync.exists(socket);
      if (socket_exists) {
        yield fsAsync.unlink(socket);
      }

      var child = yield this._start_service(name, cmd, pid);
      this.memcached = child;
      log.info("memcachedjs started in socket: ", socket);
      this._handleChild(name, child);
      return promiseResolve({});
    });
  },

  stop(skip_containers = false) {
    if (this.isRunnig()) {
      log.debug("call to stop balancer");
      return Tools.async_status("balancer", this, function* (change_status) {
        if (!skip_containers) {
          yield thenAll([
            this._stop_system('balancer-redirect', change_status),
            this._stop_system('dns', change_status),
          ]);
        }
        yield this._stop_sub_service("hipache", change_status);
        yield this._stop_sub_service("memcached", change_status);
      });
    } else {
      return promiseResolve();
    }
  },

  isRunnig() {
    return (
      (this.hipache && this.hipache.running) ||
      (this.memcached && this.memcached.running)
    );
  },

  _removeEntry(entries, backend) {
    return _.filter(entries, (entry) => { return entry != backend; });
  },

  _getSystem(system) {
    var manifest = new lazy.Manifest(config('paths:shared'), true);
    return manifest.system(system, true);
  },

  _waitDocker(timeout_max = 20000) {
    var docker_host = config("docker:host");
    var promise = net.waitService(docker_host, {
      timeout: timeout_max,
      context: "balancer",
      publish_retry: false,
    });
    return promise.then((success) => {
      if (!success) {
        throw new AgentStartError(t('errors.connect_docker_unavailable'));
      }
      return success;
    });
  },

  // TODO: check if system is running
  _run_system(system_name, options = {}) {
    return Tools.async_status("balancer", this, function* (change_status) {
      if (this.running[system_name]) {
        return true;
      }
      var system = this._getSystem(system_name);

      // Wait docker
      yield this._waitDocker();

      // Save outputs to use in error
      var output = "";
      options.stdout = new lazy.MemoryStream();
      options.stdout.on('data', (data) => {
        output += data.toString();
      });

      yield system.stop();
      change_status("starting_" + system_name);
      var result = yield system.scale(1, options);

      if (!result) {
        throw new Error(`Fail to start balancer (${system_name}): ${output}`);
      }

      // Save state
      change_status("started_" + system_name);
      this.running[system_name] = true;
    });
  },

  _stop_system(system_name, change_status) {
    return async(this, function* () {
      if (!this.running[system_name]) {
        return false;
      }

      var system = this._getSystem(system_name);

      // Wait docker
      try {
        yield this._waitDocker(30000);

        // Stop
        change_status("stopping_" + system_name);
        yield system
          .stop()
          .catch((err) => {
            try {
              log.error(err);
              change_status("error", err);
            } catch (err) {}
            return true;
          });
        change_status("stopped_" + system_name);
      } catch (err) {
        var msg = err.stack ? err.stack : err.toString();
        log.warn(`[agent] Error to stop balance system ${system_name}`, msg);
      }

      // Save state
      this.running[system_name] = false;
    });
  },

  _handleChild(name, child) {
    child.on('stop', () => {
      log.info(name + ' stopped');
    });

    // Log child error if exited
    child.on('exit:code', (code) => {
      if (code && code !== 0) {
        log.error(name + ' exit code: ' + code);
      }
    });

    // Log child outpus
    var info = (data) => {
      log.info(name + ': %s', data.toString().trim());
    };
    child.on('stdout', info);
    child.on('stderr', info);
  },

  _start_service(name, cmd, pid) {
    cmd = [path.join(config('paths:azk_root'), 'bin', 'azk'), ...cmd];
    var options = {
      max    : 1,
      silent : true,
      fork   : true,
      pidFile: pid,
      detached: false,
    };

    return Tools.defer_status("balancer", (resolve, reject, change_status) => {
      // Log and post msgs
      log.info("starting " + name);
      change_status("starting_" + name);

      var child = lazy.forever.start(cmd, options);
      child.on('exit', () => {
        reject();
        lazy.Client.stop();
      });
      child.on('start', () => resolve(child));

      change_status("started_" + name);
    });
  },

  _stop_sub_service(sub, change_status) {
    return defer((resolve) => {
      var service = this[sub];
      if (service && service.running) {

        change_status("stopping_" + sub);
        service.on('stop', () => {
          change_status("stopped_" + sub);
          resolve();
        });

        service.on('exit', () => {
          change_status("exited_" + sub);
          resolve();
        });

        service.kill();
      } else {
        resolve();
      }
    });
  },

  _check_config(ip, port, memcached_socket) {
    var file = config('paths:balancer_file');
    var log  = path.join(config('paths:logs'), "hipache_access.log");
    var bind = ["127.0.0.1", "::1"];

    // Only ip not a null
    if (ip) { bind.push(ip); }

    var data = {
      user: process.getuid(),
      server: {
        accessLog: log,
        workers:        config('agent:balancer:workers'),
        maxSockets:     config('agent:balancer:worker_max_sockets'),
        tcpTimeout:     config('agent:balancer:tcp_timeout'),
        deadBackendTTL: config('agent:balancer:dead_backend_ttl'),
      },
      http: { port, bind },
      driver: ["memcached://" + memcached_socket]
    };

    // set content
    return fsAsync.writeFile(file, JSON.stringify(data, null, '  ')).then(function () {
      return file;
    });
  }
};

export { Balancer };