src/path-watcher.js

Summary

Maintainability
D
2 days
Test Coverage
const fs = require('fs');
const path = require('path');

const { Emitter, Disposable, CompositeDisposable } = require('event-kit');
const nsfw = require('@atom/nsfw');
const watcher = require('@atom/watcher');
const { NativeWatcherRegistry } = require('./native-watcher-registry');

// Private: Associate native watcher action flags with descriptive String equivalents.
const ACTION_MAP = new Map([
  [nsfw.actions.MODIFIED, 'modified'],
  [nsfw.actions.CREATED, 'created'],
  [nsfw.actions.DELETED, 'deleted'],
  [nsfw.actions.RENAMED, 'renamed']
]);

// Private: Possible states of a {NativeWatcher}.
const WATCHER_STATE = {
  STOPPED: Symbol('stopped'),
  STARTING: Symbol('starting'),
  RUNNING: Symbol('running'),
  STOPPING: Symbol('stopping')
};

// Private: Interface with and normalize events from a filesystem watcher implementation.
class NativeWatcher {
  // Private: Initialize a native watcher on a path.
  //
  // Events will not be produced until {start()} is called.
  constructor(normalizedPath) {
    this.normalizedPath = normalizedPath;
    this.emitter = new Emitter();
    this.subs = new CompositeDisposable();

    this.state = WATCHER_STATE.STOPPED;

    this.onEvents = this.onEvents.bind(this);
    this.onError = this.onError.bind(this);
  }

  // Private: Begin watching for filesystem events.
  //
  // Has no effect if the watcher has already been started.
  async start() {
    if (this.state !== WATCHER_STATE.STOPPED) {
      return;
    }
    this.state = WATCHER_STATE.STARTING;

    await this.doStart();

    this.state = WATCHER_STATE.RUNNING;
    this.emitter.emit('did-start');
  }

  doStart() {
    return Promise.reject(new Error('doStart() not overridden'));
  }

  // Private: Return true if the underlying watcher is actively listening for filesystem events.
  isRunning() {
    return this.state === WATCHER_STATE.RUNNING;
  }

  // Private: Register a callback to be invoked when the filesystem watcher has been initialized.
  //
  // Returns: A {Disposable} to revoke the subscription.
  onDidStart(callback) {
    return this.emitter.on('did-start', callback);
  }

  // Private: Register a callback to be invoked with normalized filesystem events as they arrive. Starts the watcher
  // automatically if it is not already running. The watcher will be stopped automatically when all subscribers
  // dispose their subscriptions.
  //
  // Returns: A {Disposable} to revoke the subscription.
  onDidChange(callback) {
    this.start();

    const sub = this.emitter.on('did-change', callback);
    return new Disposable(() => {
      sub.dispose();
      if (this.emitter.listenerCountForEventName('did-change') === 0) {
        this.stop();
      }
    });
  }

  // Private: Register a callback to be invoked when a {Watcher} should attach to a different {NativeWatcher}.
  //
  // Returns: A {Disposable} to revoke the subscription.
  onShouldDetach(callback) {
    return this.emitter.on('should-detach', callback);
  }

  // Private: Register a callback to be invoked when a {NativeWatcher} is about to be stopped.
  //
  // Returns: A {Disposable} to revoke the subscription.
  onWillStop(callback) {
    return this.emitter.on('will-stop', callback);
  }

  // Private: Register a callback to be invoked when the filesystem watcher has been stopped.
  //
  // Returns: A {Disposable} to revoke the subscription.
  onDidStop(callback) {
    return this.emitter.on('did-stop', callback);
  }

  // Private: Register a callback to be invoked with any errors reported from the watcher.
  //
  // Returns: A {Disposable} to revoke the subscription.
  onDidError(callback) {
    return this.emitter.on('did-error', callback);
  }

  // Private: Broadcast an `onShouldDetach` event to prompt any {Watcher} instances bound here to attach to a new
  // {NativeWatcher} instead.
  //
  // * `replacement` the new {NativeWatcher} instance that a live {Watcher} instance should reattach to instead.
  // * `watchedPath` absolute path watched by the new {NativeWatcher}.
  reattachTo(replacement, watchedPath, options) {
    this.emitter.emit('should-detach', { replacement, watchedPath, options });
  }

  // Private: Stop the native watcher and release any operating system resources associated with it.
  //
  // Has no effect if the watcher is not running.
  async stop() {
    if (this.state !== WATCHER_STATE.RUNNING) {
      return;
    }
    this.state = WATCHER_STATE.STOPPING;
    this.emitter.emit('will-stop');

    await this.doStop();

    this.state = WATCHER_STATE.STOPPED;

    this.emitter.emit('did-stop');
  }

  doStop() {
    return Promise.resolve();
  }

  // Private: Detach any event subscribers.
  dispose() {
    this.emitter.dispose();
  }

  // Private: Callback function invoked by the native watcher when a debounced group of filesystem events arrive.
  // Normalize and re-broadcast them to any subscribers.
  //
  // * `events` An Array of filesystem events.
  onEvents(events) {
    this.emitter.emit('did-change', events);
  }

  // Private: Callback function invoked by the native watcher when an error occurs.
  //
  // * `err` The native filesystem error.
  onError(err) {
    this.emitter.emit('did-error', err);
  }
}

// Private: Emulate a "filesystem watcher" by subscribing to Atom events like buffers being saved. This will miss
// any changes made to files outside of Atom, but it also has no overhead.
class AtomNativeWatcher extends NativeWatcher {
  async doStart() {
    const getRealPath = givenPath => {
      if (!givenPath) {
        return Promise.resolve(null);
      }

      return new Promise(resolve => {
        fs.realpath(givenPath, (err, resolvedPath) => {
          err ? resolve(null) : resolve(resolvedPath);
        });
      });
    };

    this.subs.add(
      atom.workspace.observeTextEditors(async editor => {
        let realPath = await getRealPath(editor.getPath());
        if (!realPath || !realPath.startsWith(this.normalizedPath)) {
          return;
        }

        const announce = (action, oldPath) => {
          const payload = { action, path: realPath };
          if (oldPath) payload.oldPath = oldPath;
          this.onEvents([payload]);
        };

        const buffer = editor.getBuffer();

        this.subs.add(buffer.onDidConflict(() => announce('modified')));
        this.subs.add(buffer.onDidReload(() => announce('modified')));
        this.subs.add(
          buffer.onDidSave(event => {
            if (event.path === realPath) {
              announce('modified');
            } else {
              const oldPath = realPath;
              realPath = event.path;
              announce('renamed', oldPath);
            }
          })
        );

        this.subs.add(buffer.onDidDelete(() => announce('deleted')));

        this.subs.add(
          buffer.onDidChangePath(newPath => {
            if (newPath !== this.normalizedPath) {
              const oldPath = this.normalizedPath;
              this.normalizedPath = newPath;
              announce('renamed', oldPath);
            }
          })
        );
      })
    );

    // Giant-ass brittle hack to hook files (and eventually directories) created from the TreeView.
    const treeViewPackage = await atom.packages.getLoadedPackage('tree-view');
    if (!treeViewPackage) return;
    await treeViewPackage.activationPromise;
    const treeViewModule = treeViewPackage.mainModule;
    if (!treeViewModule) return;
    const treeView = treeViewModule.getTreeViewInstance();

    const isOpenInEditor = async eventPath => {
      const openPaths = await Promise.all(
        atom.workspace
          .getTextEditors()
          .map(editor => getRealPath(editor.getPath()))
      );
      return openPaths.includes(eventPath);
    };

    this.subs.add(
      treeView.onFileCreated(async event => {
        const realPath = await getRealPath(event.path);
        if (!realPath) return;

        this.onEvents([{ action: 'added', path: realPath }]);
      })
    );

    this.subs.add(
      treeView.onEntryDeleted(async event => {
        const realPath = await getRealPath(event.path);
        if (!realPath || (await isOpenInEditor(realPath))) return;

        this.onEvents([{ action: 'deleted', path: realPath }]);
      })
    );

    this.subs.add(
      treeView.onEntryMoved(async event => {
        const [realNewPath, realOldPath] = await Promise.all([
          getRealPath(event.newPath),
          getRealPath(event.initialPath)
        ]);
        if (
          !realNewPath ||
          !realOldPath ||
          (await isOpenInEditor(realNewPath)) ||
          (await isOpenInEditor(realOldPath))
        )
          return;

        this.onEvents([
          { action: 'renamed', path: realNewPath, oldPath: realOldPath }
        ]);
      })
    );
  }
}

// Private: Implement a native watcher by translating events from an NSFW watcher.
class NSFWNativeWatcher extends NativeWatcher {
  async doStart(rootPath, eventCallback, errorCallback) {
    const handler = events => {
      this.onEvents(
        events.map(event => {
          const action =
            ACTION_MAP.get(event.action) || `unexpected (${event.action})`;
          const payload = { action };

          if (event.file) {
            payload.path = path.join(event.directory, event.file);
          } else {
            payload.oldPath = path.join(
              event.directory,
              typeof event.oldFile === 'undefined' ? '' : event.oldFile
            );
            payload.path = path.join(
              event.directory,
              typeof event.newFile === 'undefined' ? '' : event.newFile
            );
          }

          return payload;
        })
      );
    };

    this.watcher = await nsfw(this.normalizedPath, handler, {
      debounceMS: 100,
      errorCallback: this.onError
    });

    await this.watcher.start();
  }

  doStop() {
    return this.watcher.stop();
  }
}

// Extended: Manage a subscription to filesystem events that occur beneath a root directory. Construct these by
// calling `watchPath`. To watch for events within active project directories, use {Project::onDidChangeFiles}
// instead.
//
// Multiple PathWatchers may be backed by a single native watcher to conserve operation system resources.
//
// Call {::dispose} to stop receiving events and, if possible, release underlying resources. A PathWatcher may be
// added to a {CompositeDisposable} to manage its lifetime along with other {Disposable} resources like event
// subscriptions.
//
// ```js
// const {watchPath} = require('atom')
//
// const disposable = await watchPath('/var/log', {}, events => {
//   console.log(`Received batch of ${events.length} events.`)
//   for (const event of events) {
//     // "created", "modified", "deleted", "renamed"
//     console.log(`Event action: ${event.action}`)
//
//     // absolute path to the filesystem entry that was touched
//     console.log(`Event path: ${event.path}`)
//
//     if (event.action === 'renamed') {
//       console.log(`.. renamed from: ${event.oldPath}`)
//     }
//   }
// })
//
//  // Immediately stop receiving filesystem events. If this is the last
//  // watcher, asynchronously release any OS resources required to
//  // subscribe to these events.
//  disposable.dispose()
// ```
//
// `watchPath` accepts the following arguments:
//
// `rootPath` {String} specifies the absolute path to the root of the filesystem content to watch.
//
// `options` Control the watcher's behavior. Currently a placeholder.
//
// `eventCallback` {Function} to be called each time a batch of filesystem events is observed. Each event object has
// the keys: `action`, a {String} describing the filesystem action that occurred, one of `"created"`, `"modified"`,
// `"deleted"`, or `"renamed"`; `path`, a {String} containing the absolute path to the filesystem entry that was acted
// upon; for rename events only, `oldPath`, a {String} containing the filesystem entry's former absolute path.
class PathWatcher {
  // Private: Instantiate a new PathWatcher. Call {watchPath} instead.
  //
  // * `nativeWatcherRegistry` {NativeWatcherRegistry} used to find and consolidate redundant watchers.
  // * `watchedPath` {String} containing the absolute path to the root of the watched filesystem tree.
  // * `options` See {watchPath} for options.
  //
  constructor(nativeWatcherRegistry, watchedPath, options) {
    this.watchedPath = watchedPath;
    this.nativeWatcherRegistry = nativeWatcherRegistry;

    this.normalizedPath = null;
    this.native = null;
    this.changeCallbacks = new Map();

    this.attachedPromise = new Promise(resolve => {
      this.resolveAttachedPromise = resolve;
    });

    this.startPromise = new Promise((resolve, reject) => {
      this.resolveStartPromise = resolve;
      this.rejectStartPromise = reject;
    });

    this.normalizedPathPromise = new Promise((resolve, reject) => {
      fs.realpath(watchedPath, (err, real) => {
        if (err) {
          reject(err);
          return;
        }

        this.normalizedPath = real;
        resolve(real);
      });
    });
    this.normalizedPathPromise.catch(err => this.rejectStartPromise(err));

    this.emitter = new Emitter();
    this.subs = new CompositeDisposable();
  }

  // Private: Return a {Promise} that will resolve with the normalized root path.
  getNormalizedPathPromise() {
    return this.normalizedPathPromise;
  }

  // Private: Return a {Promise} that will resolve the first time that this watcher is attached to a native watcher.
  getAttachedPromise() {
    return this.attachedPromise;
  }

  // Extended: Return a {Promise} that will resolve when the underlying native watcher is ready to begin sending events.
  // When testing filesystem watchers, it's important to await this promise before making filesystem changes that you
  // intend to assert about because there will be a delay between the instantiation of the watcher and the activation
  // of the underlying OS resources that feed its events.
  //
  // PathWatchers acquired through `watchPath` are already started.
  //
  // ```js
  // const {watchPath} = require('atom')
  // const ROOT = path.join(__dirname, 'fixtures')
  // const FILE = path.join(ROOT, 'filename.txt')
  //
  // describe('something', function () {
  //   it("doesn't miss events", async function () {
  //     const watcher = watchPath(ROOT, {}, events => {})
  //     await watcher.getStartPromise()
  //     fs.writeFile(FILE, 'contents\n', err => {
  //       // The watcher is listening and the event should be
  //       // received asynchronously
  //     }
  //   })
  // })
  // ```
  getStartPromise() {
    return this.startPromise;
  }

  // Private: Attach another {Function} to be called with each batch of filesystem events. See {watchPath} for the
  // spec of the callback's argument.
  //
  // * `callback` {Function} to be called with each batch of filesystem events.
  //
  // Returns a {Disposable} that will stop the underlying watcher when all callbacks mapped to it have been disposed.
  onDidChange(callback) {
    if (this.native) {
      const sub = this.native.onDidChange(events =>
        this.onNativeEvents(events, callback)
      );
      this.changeCallbacks.set(callback, sub);

      this.native.start();
    } else {
      // Attach to a new native listener and retry
      this.nativeWatcherRegistry.attach(this).then(() => {
        this.onDidChange(callback);
      });
    }

    return new Disposable(() => {
      const sub = this.changeCallbacks.get(callback);
      this.changeCallbacks.delete(callback);
      sub.dispose();
    });
  }

  // Extended: Invoke a {Function} when any errors related to this watcher are reported.
  //
  // * `callback` {Function} to be called when an error occurs.
  //   * `err` An {Error} describing the failure condition.
  //
  // Returns a {Disposable}.
  onDidError(callback) {
    return this.emitter.on('did-error', callback);
  }

  // Private: Wire this watcher to an operating system-level native watcher implementation.
  attachToNative(native) {
    this.subs.dispose();
    this.native = native;

    if (native.isRunning()) {
      this.resolveStartPromise();
    } else {
      this.subs.add(
        native.onDidStart(() => {
          this.resolveStartPromise();
        })
      );
    }

    // Transfer any native event subscriptions to the new NativeWatcher.
    for (const [callback, formerSub] of this.changeCallbacks) {
      const newSub = native.onDidChange(events =>
        this.onNativeEvents(events, callback)
      );
      this.changeCallbacks.set(callback, newSub);
      formerSub.dispose();
    }

    this.subs.add(
      native.onDidError(err => {
        this.emitter.emit('did-error', err);
      })
    );

    this.subs.add(
      native.onShouldDetach(({ replacement, watchedPath }) => {
        if (
          this.native === native &&
          replacement !== native &&
          this.normalizedPath.startsWith(watchedPath)
        ) {
          this.attachToNative(replacement);
        }
      })
    );

    this.subs.add(
      native.onWillStop(() => {
        if (this.native === native) {
          this.subs.dispose();
          this.native = null;
        }
      })
    );

    this.resolveAttachedPromise();
  }

  // Private: Invoked when the attached native watcher creates a batch of native filesystem events. The native watcher's
  // events may include events for paths above this watcher's root path, so filter them to only include the relevant
  // ones, then re-broadcast them to our subscribers.
  onNativeEvents(events, callback) {
    const isWatchedPath = eventPath =>
      eventPath.startsWith(this.normalizedPath);

    const filtered = [];
    for (let i = 0; i < events.length; i++) {
      const event = events[i];

      if (event.action === 'renamed') {
        const srcWatched = isWatchedPath(event.oldPath);
        const destWatched = isWatchedPath(event.path);

        if (srcWatched && destWatched) {
          filtered.push(event);
        } else if (srcWatched && !destWatched) {
          filtered.push({
            action: 'deleted',
            kind: event.kind,
            path: event.oldPath
          });
        } else if (!srcWatched && destWatched) {
          filtered.push({
            action: 'created',
            kind: event.kind,
            path: event.path
          });
        }
      } else {
        if (isWatchedPath(event.path)) {
          filtered.push(event);
        }
      }
    }

    if (filtered.length > 0) {
      callback(filtered);
    }
  }

  // Extended: Unsubscribe all subscribers from filesystem events. Native resources will be released asynchronously,
  // but this watcher will stop broadcasting events immediately.
  dispose() {
    for (const sub of this.changeCallbacks.values()) {
      sub.dispose();
    }

    this.emitter.dispose();
    this.subs.dispose();
  }
}

// Private: Globally tracked state used to de-duplicate related [PathWatchers]{PathWatcher} backed by emulated Atom
// events or NSFW.
class PathWatcherManager {
  // Private: Access the currently active manager instance, creating one if necessary.
  static active() {
    if (!this.activeManager) {
      this.activeManager = new PathWatcherManager(
        atom.config.get('core.fileSystemWatcher')
      );
      this.sub = atom.config.onDidChange(
        'core.fileSystemWatcher',
        ({ newValue }) => {
          this.transitionTo(newValue);
        }
      );
    }
    return this.activeManager;
  }

  // Private: Replace the active {PathWatcherManager} with a new one that creates [NativeWatchers]{NativeWatcher}
  // based on the value of `setting`.
  static async transitionTo(setting) {
    const current = this.active();

    if (this.transitionPromise) {
      await this.transitionPromise;
    }

    if (current.setting === setting) {
      return;
    }
    current.isShuttingDown = true;

    let resolveTransitionPromise = () => {};
    this.transitionPromise = new Promise(resolve => {
      resolveTransitionPromise = resolve;
    });

    const replacement = new PathWatcherManager(setting);
    this.activeManager = replacement;

    await Promise.all(
      Array.from(current.live, async ([root, native]) => {
        const w = await replacement.createWatcher(root, {}, () => {});
        native.reattachTo(w.native, root, w.native.options || {});
      })
    );

    current.stopAllWatchers();

    resolveTransitionPromise();
    this.transitionPromise = null;
  }

  // Private: Initialize global {PathWatcher} state.
  constructor(setting) {
    this.setting = setting;
    this.live = new Map();

    const initLocal = NativeConstructor => {
      this.nativeRegistry = new NativeWatcherRegistry(normalizedPath => {
        const nativeWatcher = new NativeConstructor(normalizedPath);

        this.live.set(normalizedPath, nativeWatcher);
        const sub = nativeWatcher.onWillStop(() => {
          this.live.delete(normalizedPath);
          sub.dispose();
        });

        return nativeWatcher;
      });
    };

    if (setting === 'atom') {
      initLocal(AtomNativeWatcher);
    } else if (setting === 'experimental') {
      //
    } else if (setting === 'poll') {
      //
    } else {
      initLocal(NSFWNativeWatcher);
    }

    this.isShuttingDown = false;
  }

  useExperimentalWatcher() {
    return this.setting === 'experimental' || this.setting === 'poll';
  }

  // Private: Create a {PathWatcher} tied to this global state. See {watchPath} for detailed arguments.
  async createWatcher(rootPath, options, eventCallback) {
    if (this.isShuttingDown) {
      await this.constructor.transitionPromise;
      return PathWatcherManager.active().createWatcher(
        rootPath,
        options,
        eventCallback
      );
    }

    if (this.useExperimentalWatcher()) {
      if (this.setting === 'poll') {
        options.poll = true;
      }

      const w = await watcher.watchPath(rootPath, options, eventCallback);
      this.live.set(rootPath, w.native);
      return w;
    }

    const w = new PathWatcher(this.nativeRegistry, rootPath, options);
    w.onDidChange(eventCallback);
    await w.getStartPromise();
    return w;
  }

  // Private: Directly access the {NativeWatcherRegistry}.
  getRegistry() {
    if (this.useExperimentalWatcher()) {
      return watcher.getRegistry();
    }

    return this.nativeRegistry;
  }

  // Private: Sample watcher usage statistics. Only available for experimental watchers.
  status() {
    if (this.useExperimentalWatcher()) {
      return watcher.status();
    }

    return {};
  }

  // Private: Return a {String} depicting the currently active native watchers.
  print() {
    if (this.useExperimentalWatcher()) {
      return watcher.printWatchers();
    }

    return this.nativeRegistry.print();
  }

  // Private: Stop all living watchers.
  //
  // Returns a {Promise} that resolves when all native watcher resources are disposed.
  stopAllWatchers() {
    if (this.useExperimentalWatcher()) {
      return watcher.stopAllWatchers();
    }

    return Promise.all(Array.from(this.live, ([, w]) => w.stop()));
  }
}

// Extended: Invoke a callback with each filesystem event that occurs beneath a specified path. If you only need to
// watch events within the project's root paths, use {Project::onDidChangeFiles} instead.
//
// watchPath handles the efficient re-use of operating system resources across living watchers. Watching the same path
// more than once, or the child of a watched path, will re-use the existing native watcher.
//
// * `rootPath` {String} specifies the absolute path to the root of the filesystem content to watch.
// * `options` Control the watcher's behavior.
// * `eventCallback` {Function} or other callable to be called each time a batch of filesystem events is observed.
//    * `events` {Array} of objects that describe the events that have occurred.
//      * `action` {String} describing the filesystem action that occurred. One of `"created"`, `"modified"`,
//        `"deleted"`, or `"renamed"`.
//      * `path` {String} containing the absolute path to the filesystem entry that was acted upon.
//      * `oldPath` For rename events, {String} containing the filesystem entry's former absolute path.
//
// Returns a {Promise} that will resolve to a {PathWatcher} once it has started. Note that every {PathWatcher}
// is a {Disposable}, so they can be managed by a {CompositeDisposable} if desired.
//
// ```js
// const {watchPath} = require('atom')
//
// const disposable = await watchPath('/var/log', {}, events => {
//   console.log(`Received batch of ${events.length} events.`)
//   for (const event of events) {
//     // "created", "modified", "deleted", "renamed"
//     console.log(`Event action: ${event.action}`)
//     // absolute path to the filesystem entry that was touched
//     console.log(`Event path: ${event.path}`)
//     if (event.action === 'renamed') {
//       console.log(`.. renamed from: ${event.oldPath}`)
//     }
//   }
// })
//
//  // Immediately stop receiving filesystem events. If this is the last watcher, asynchronously release any OS
//  // resources required to subscribe to these events.
//  disposable.dispose()
// ```
//
function watchPath(rootPath, options, eventCallback) {
  return PathWatcherManager.active().createWatcher(
    rootPath,
    options,
    eventCallback
  );
}

// Private: Return a Promise that resolves when all {NativeWatcher} instances associated with a FileSystemManager
// have stopped listening. This is useful for `afterEach()` blocks in unit tests.
function stopAllWatchers() {
  return PathWatcherManager.active().stopAllWatchers();
}

// Private: Show the currently active native watchers in a formatted {String}.
watchPath.printWatchers = function() {
  return PathWatcherManager.active().print();
};

// Private: Access the active {NativeWatcherRegistry}.
watchPath.getRegistry = function() {
  return PathWatcherManager.active().getRegistry();
};

// Private: Sample usage statistics for the active watcher.
watchPath.status = function() {
  return PathWatcherManager.active().status();
};

// Private: Configure @atom/watcher ("experimental") directly.
watchPath.configure = function(...args) {
  return watcher.configure(...args);
};

module.exports = { watchPath, stopAllWatchers };