azukiapp/azk

View on GitHub
src/agent/client.js

Summary

Maintainability
A
1 hr
Test Coverage
import { _, lazy_require, log } from 'azk';
import { publish } from 'azk/utils/postal';
import { defer, promiseResolve, promisify } from 'azk/utils/promises';
import { config, set_config } from 'azk';
import { Agent } from 'azk/agent';
import { AgentNotRunning } from 'azk/utils/errors';

var lazy = lazy_require({
  uuid      : 'node-uuid',
  url       : 'url',
  WebSocket : 'ws',
  request   : 'request',
});

var HttpClient = {
  url(path) {
    return `http:\/\/unix:${config('paths:api_socket')}:${path}` ;
  },

  request(method, path, opts = {}) {
    method = promisify(lazy.request[method], { multiArgs: true, context: lazy.request });
    return method(_.defaults(opts, {
      url : this.url(path),
      json: true,
    }));
  },

  get(...args) {
    return this.request('get', ...args);
  }
};

var WebSocketClient = {
  ws_path: '/cli',
  _ws: null,
  _status: null,
  _buffer: [],
  _cb: {},

  init() {
    return defer((resolve, reject) => {
      if (this._status === 'connecting' || this._status === 'connected') {
        return resolve();
      }

      this._status = 'connecting';

      var socket_address  = lazy.url.parse(`ws+unix:\/\/${config('paths:api_socket')}`);
      socket_address.path = this.ws_path;

      this._ws = new lazy.WebSocket(socket_address);

      this._ws.on('open', () => {
        this._status = 'connected';
        log.debug('Websocket connected.');
        while (this._buffer.length > 0 && this._ws) {
          var item = this._buffer.shift();
          var message = item.message;
          message.id = this._generate_msg_id();
          this._cb[message.id] = item.callback;
          this._ws.send(JSON.stringify(message));
        }
        return resolve();
      });

      this._ws.on('close', () => {
        this._status = 'closed';
        log.debug('Websocket closed.');
      });

      this._ws.on('error', (err) => {
        log.error('Error on websocket:', err);
        reject(err);
      });

      this._ws.on('message', (data) => {
        var message = JSON.parse(data);
        if (!this._cb[message.id]) { return; }
        var callback = this._cb[message.id];
        callback(message, () => {
          delete this._cb[message.id];
        });
      });
    });
  },

  close() {
    return defer((resolve, reject) => {
      let status = this._status;
      if (status !== 'closed' && status !== 'closing') {
        this._ws.removeAllListeners('error');
        this._ws.removeAllListeners('close');

        this._ws.on('close', () => {
          this._status = 'closed';
          log.debug('Websocket closed.');
          resolve(true);
        });

        this._ws.on('error', (err) => {
          log.error('Error closing websocket:', err);
          reject(err);
        });

        this._status = 'closing';
        this._ws.terminate();
      } else {
        resolve(false);
      }
    });
  },

  send(message, callback = null, retry = 0) {
    this.init()
      .catch((err) => {
        log.error('Failed to init websocket: ', err);
      })
      .then(() => {
        if (this._status == 'connected') {
          message.id = this._generate_msg_id();
          this._cb[message.id] = callback;
          this._ws.send(JSON.stringify(message));
        } else {
          this._buffer.push({message, callback});
        }
        return true;
      })
      .catch((err) => {
        if (retry-- > 0) {
          log.debug('Failed to send message: ', message);
          log.debug('Retry: ', retry);
          return this.send(message, callback, retry);
        } else {
          log.error('Failed to send message to websocket: ', err);
          return false;
        }
      });
  },

  _generate_msg_id() {
    return lazy.uuid.v4().split('-')[4];
  }
};

var Client = {
  status(action_name, pub = true) {
    var pid = Agent.agentPid();
    var status_obj = {
      pid     : pid,
      agent   : pid.running,
      docker  : false,
      balancer: false,
    };

    if (pub) {
      var status = status_obj.agent ? "running" : "not_running";
      publish("agent.client.status", { type: "status", status });
    }

    return promiseResolve(status_obj);
  },

  start(opts) {
    return Agent.start(opts).then(() => { return 0; });
  },

  stop(opts) {
    return this.status()
    .then((status) => {
      if (status.agent) {
        return Agent.stop(opts).then((result) => {
          return { agent: result };
        });
      } else {
        return { agent: false };
      }
    });
  },

  configs() {
    return HttpClient
      .get('/configs')
      .spread((response, body) => { return body; });
  },

  watch(host_folder, guest_folder, opts = {}) {
    return defer((resolve, reject) => {
      var req = { action: 'watch', data: { host_folder, guest_folder, opts } };
      WebSocketClient.send(req, (res, end) => {
        switch (res.status) {
          case 'start':
            publish("sync.status", { type: "starting" });
            break;
          case 'sync':
            publish("sync.status", { type: "sync", status: res.data });
            break;
          case 'done' :
            end();
            resolve(true);
            break;
          case 'fail' :
            end();
            reject(res.err);
            break;
        }
      });
    });
  },

  unwatch(host_folder, guest_folder) {
    return defer((resolve, reject) => {
      var req = { action: 'unwatch', data: { host_folder, guest_folder } };
      WebSocketClient.send(req, (res, end) => {
        switch (res.status) {
          case 'fail':
            end();
            reject();
            break;
          default:
            end();
            resolve(true);
            break;
        }
      });
    });
  },

  watchers() {
    return defer((resolve) => {
      var req = { action: 'watchers' };
      WebSocketClient.send(req, (res, end) => {
        end();
        resolve(res);
      });
    });
  },

  ssh_opts() {
    var key = `-i ${config('agent:vm:ssh_key')}`;
    return {
      url : `${config('agent:vm:user')}@${config('agent:vm:ip')}`,
      opts: key + " -o StrictHostKeyChecking=no -o LogLevel=quiet -o UserKnownHostsFile=/dev/null",
    };
  },

  closeWs() {
    return WebSocketClient.close();
  },

  require() {
    return this
      .status()
      .then((status) => {
        if (status.agent) {
          return this.configs();
        }
        throw new AgentNotRunning();
      })
      .then((configs) => {
        _.each(configs, (value, key) => {
          set_config(key, value);
        });
      });
  },
};

export { Client, HttpClient, WebSocketClient };