azukiapp/azk

View on GitHub
src/sync/worker.js

Summary

Maintainability
A
1 hr
Test Coverage
import { _, lazy_require, path, fsAsync } from 'azk';
import { defer, async, promiseResolve } from 'azk/utils/promises';

// Only for debug
// require('source-map-support').install({});

var lazy = lazy_require({
  Sync    : ['azk/sync'],
  chokidar: 'chokidar'
});

const WATCH_IDLE = 0, WATCH_INIT = 1, WATCH_READY = 2;

export class Worker {
  constructor(process) {
    this.chok    = null;
    this.status  = WATCH_IDLE;
    this.process = process;
    this.process.on('message', (data) => {
      process.title = "azk sync worker " + data.origin + " " + data.destination;
      this.watch(data.origin, data.destination, data.opts).then(() => {}, () => {});
    });
  }

  watch(origin, destination, opts = {}) {
    this.unwatch();
    this.status = WATCH_INIT;

    return async(this, function* () {
      try {
        // Be sure watcher is in proper status
        if (this.status !== WATCH_INIT) { return; }

        var exists = yield fsAsync.exists(origin);
        if (!exists) {
          throw { err: 'Sync: origin path not exist', code: 101 };
        }

        var stats = yield fsAsync.stat(origin);
        if (!(stats.isDirectory || stats.isFile() || stats.isSymbolicLink())) {
          throw new Error(`The type of the file ${origin} is not supported for synchronization`);
        }

        // Calculate includes, excludes and watch pattern
        let patterns_ary = [], ignored = [];
        let except_from  = yield this._check_for_except_from(origin, stats.isDirectory(), opts.except_from);
        [ patterns_ary, ignored, opts.include, opts.except ] = yield this._process_patterns(origin, except_from, opts);
        delete opts.except_from;

        // Be sure watcher is in proper status
        if (this.status !== WATCH_INIT) { return; }

        yield lazy.Sync.sync(origin, destination, opts);
        this._send('sync', 'done');

        // Be sure watcher is in proper status
        if (this.status !== WATCH_INIT) { return; }
        yield this._start_watcher(patterns_ary, ignored, origin, destination, opts);
        this._send('watch', 'ready');
        this.status = WATCH_READY;
      } catch (err) {
        this.unwatch();
        this._send('sync', 'fail', err);
        throw err;
      }
    });
  }

  unwatch() {
    if (!_.isEmpty(this.chok)) {
      this._send('sync', 'close');
      this.chok.close();
      this.chok = null;
    }
    this.status = WATCH_IDLE;
  }

  _start_watcher(patterns_ary, ignored, src, dest, opts = {}) {
    let promise = defer((resolve, reject) => {
      this.chok = lazy.chokidar.watch(patterns_ary, { ignored: ignored, ignoreInitial: true })
        .on('all', (event, filepath) => {
          this._sync(event, filepath, src, dest, opts);
        })
        .on('ready', () => {
          this.chok.removeAllListeners('error');
          resolve(true);
        })
        .on('error', (err) => {
          if (promise.isFulfilled()) {
            err.level = "warning";
            this._send('watch', 'fail', err);
            // restart
            this.watch(src, dest, opts);
          } else {
            err.level = "critical";
            reject(err);
          }
        });
    });
    return promise;
  }

  _sync(event, filepath, src, dest, opts) {
    return async(this, function* () {
      if (event === "unlink" || event === "unlinkDir") {
        var exists = yield fsAsync.exists(filepath);
        if (!exists) {
          filepath = path.join(filepath, '..');
          return this._sync(event, filepath, src, dest, opts);
        }
      }

      let destination = path.join(dest, path.relative(src, filepath));
      let [ include, except ] = this._include_and_except(filepath, src, dest, opts);
      let sync_opts = {
        ssh: opts.ssh || null,
        relative_sufix: opts.relative_sufix,
        include, except,
      };

      return lazy.Sync
        .sync(filepath, destination, sync_opts)
        .then((result) => this._send(event, 'done', _.merge(result, { filepath })))
        .catch((err) => {
          err = _.assign(err, { filepath, level: "warning" });
          this._send(event, 'fail', err);
        });
    });
  }

  _include_and_except(origin, src, dest, opts) {
    let include = opts.include;
    let except  = opts.except;

    if (origin !== src) {
      let relative  = path.relative(src, origin);
      let regex     = new RegExp(`^\/${relative}\/`);
      let reduce_fn = (acc, file) => {
        if ((file.match(/^\//) || file.match(/^\.\//)) && regex.test(file)) {
          acc.push(file.replace(regex, '/'));
        } else if (!(file.match(/^\//) || file.match(/^\.\//))) {
          acc.push(file);
        }
        return acc;
      };
      include = _.reduce(include, reduce_fn, []);
      except  = _.reduce(except, reduce_fn, []);
    }

    return [include, except];
  }

  _check_for_except_from(origin, is_dir, except_from) {
    return async(this, function* () {
      if (!is_dir) { return null; }

      let candidates = except_from ? [except_from] : [];
      candidates = candidates.concat([
        path.join(origin, ".syncignore"),
        path.join(origin, ".gitignore"),
      ]);

      let exists;
      for (let i = 0; i < candidates.length; i++) {
        exists = yield fsAsync.exists(candidates[i]);
        if (exists) { return candidates[i]; }
      }

      return null;
    });
  }

  _process_patterns(origin, except_from, opts = {}) {
    return this
      ._exclude_candidates(opts.except || [], except_from)
      .then((candidates) => {
        let include = [...(opts.include || [])];
        let watch   = _.map([".", ...include], (i) => path.resolve(origin, i));
        let unwatch = [];

        let exclude = _.reduce(candidates, (acc, pattern) => {
          if (pattern.match(/^!/)) {
            pattern = pattern.replace(/^!(.*)/, '$1');
            include.push(pattern);
            watch.push(path.join(origin, pattern));
          } else if (pattern.match(/^\//) || pattern.match(/^\.\//) || pattern.match(/\*\*/)) {
            acc.push(pattern);
            unwatch.push(path.join(origin, pattern));
          } else {
            acc.push(pattern);
            unwatch.push(path.join(origin, '**', pattern));
          }
          return acc;
        }, []);

        return [watch, unwatch, include, exclude];
      });
  }

  _exclude_candidates(except, except_from) {
    if (!_.isEmpty(except_from)) {
      return fsAsync.readFile(except_from).then((content) => {
        content = content.toString().split('\n');
        return except.concat(_.filter(content, (pattern) => {
          return !_.isEmpty(pattern) && !pattern.match(/^\s*#/);
        }));
      });
    }
    return promiseResolve(except);
  }

  _send(op, status, opts = {}) {
    if (opts instanceof Error) {
      opts = JSON.stringify(opts, ["message", "arguments", "type", "name"].concat(Object.keys(opts)));
      opts = JSON.parse(opts);
    }
    this.process.send(JSON.stringify(_.merge({ op, status }, opts)));
  }
}

// If this file is a main process, it means that
// this process is being forked by azk itself
if (require.main === module) {
  process.title = 'azk sync worker';
  new Worker(process);
}