azukiapp/azk

View on GitHub
src/docker/containers_observer.js

Summary

Maintainability
A
0 mins
Test Coverage
import { _, lazy_require, log } from 'azk';
import { defer, promiseResolve } from 'azk/utils/promises';
import { IPublisher } from 'azk/utils/postal';

var lazy = lazy_require({
  docker : ['azk/docker', 'default'],
  JStream: 'jstream',
});

// create, destroy, die, exec_create, exec_start, export, kill, oom, pause, restart, start, stop, unpause

export class ContainersObserver extends IPublisher  {
  constructor(filters = null) {
    super("containers.observer");
    this.stream    = null;
    this.stoping   = null;
    this.starting  = false;
    this.attemps   = 0;
    this.filters   = filters;
  }

  get stream() {
    return this.__stream;
  }

  set stream(value) {
    this.__stream = value;
    // Connect operations in stream
    if (value) {
      lazy.docker.modem.followProgress(
        value,
        this._stream_end.bind(this),
        this._handler.bind(this)
      );
    }
  }

  start(retry = 5, timeout = 3000) {
    if (this.starting || this.stoping) { return promiseResolve(); }
    this.starting = true;
    this.attemps++;

    var options = {};
    if (!_.isEmpty(this.filters)) {
      options.filters = JSON.stringify(this.filters);
    }

    log.debug('[docker] starting containers observer');
    return defer((resolve, reject) => {
      lazy.docker.getEvents(options)
        .then((stream) => {
          log.debug('[docker] containers observer started');
          this.stream   = stream;
          this.starting = false;
          this.attemps  = 0;
          resolve(true);
        })
        .timeout(timeout, "There was a timeout error connecting to docker to capture container events")
        .catch((err) => {
          var msg = `[docker] (attemps: ${this.attemps + 1}/${retry}) observer containers connect error`;
          log.info(msg, err.stack ? err.stack : err.toString());
          // reached the limit
          if (this.attemps > retry) { reject(err); }
        });
    });
  }

  _stream_end() {
    log.info('[docker] containers observer, stream end');
    if (this.stoping) {
      this.stream = null;
      return this.stoping();
    }
    this.start();
  }

  _handler(event) {
    if (event) {
      log.debug("[docker] container event", JSON.stringify(event));
      this.publish(event.status, event);
    }
  }

  stop() {
    return defer((resolve, reject) => {
      if (this.stream) {
        try {
          this.stoping = resolve;
          this.stream.req.abort();
        } catch (err) {
          reject(err);
        }
      } else {
        resolve();
      }
    });
  }
}