cozy-labs/cozy-desktop

View on GitHub
core/local/chokidar/watcher.js

Summary

Maintainability
C
1 day
Test Coverage
/**
 * This is the {@link module:core/local/watcher|local watcher} implementation
 * based on the {@link https://github.com/paulmillr/chokidar|chokidar} library.
 *
 * It's a library that uses nodejs' watch powered by inotify/fsevents (with a
 * fallback on polling).
 *
 * ## Steps
 *
 * 1. {@link module:core/local/chokidar/initial_scan|initial_scan}
 * 2. {@link module:core/local/chokidar/prepare_events|prepare_events}
 * 3. {@link module:core/local/chokidar/analysis|analysis} (macro step)
 * 4. {@link module:core/local/chokidar/send_to_prep|send_to_prep}
 *
 * @module core/local/chokidar/watcher
 * @flow
 */

const autoBind = require('auto-bind')
const Promise = require('bluebird')
const chokidar = require('chokidar')
const path = require('path')

const analysis = require('./analysis')
const checksumer = require('../checksumer')
const chokidarEvent = require('./event')
const LocalEventBuffer = require('./event_buffer')
const initialScan = require('./initial_scan')
const normalizePaths = require('./normalize_paths')
const prepareEvents = require('./prepare_events')
const sendToPrep = require('./send_to_prep')
const stater = require('../stater')
const syncDir = require('../sync_dir')
const { logger } = require('../../utils/logger')
const { measureTime } = require('../../utils/perfs')

const { LOCAL_WATCHER_FATAL_EVENT } = require('../constants')

/*::
import type { Pouch } from '../../pouch'
import type Prep from '../../prep'
import type { Checksumer } from '../checksumer'
import type { ChokidarEvent } from './event'
import type { InitialScanParams } from './initial_scan'
import type { LocalEvent } from './local_event'
import type { LocalChange } from './local_change'
import type EventEmitter from 'events'
import fs from 'fs'
*/

const log = logger({
  component: 'ChokidarWatcher'
})
log.chokidar = log.child({
  component: 'Chokidar'
})

const hasPath = (event /*: ChokidarEvent */) /*: boolean %checks */ =>
  event.path !== ''

// See https://developer.apple.com/documentation/coreservices/1455361-fseventstreameventflags/kfseventstreameventflagmustscansubdirs
const isRescanFlag = (flags /*: number */) /*: boolean %checks */ =>
  (flags & 0x00000001) === 1

/**
 * This file contains the filesystem watcher that will trigger operations when
 * a file or a folder is added/removed/changed locally.
 * Operations will be added to the a common operation queue along with the
 * remote operations triggered by the remoteEventWatcher.
 */
class LocalWatcher {
  /*::
  syncPath: string
  prep: Prep
  pouch: Pouch
  events: EventEmitter
  initialScanParams: InitialScanParams
  checksumer: Checksumer
  watcher: any // chokidar
  buffer: LocalEventBuffer<ChokidarEvent>
  pendingChanges: LocalChange[]
  */

  constructor(
    syncPath /*: string */,
    prep /*: Prep */,
    pouch /*: Pouch */,
    events /*: EventEmitter */
  ) {
    this.syncPath = syncPath
    this.prep = prep
    this.pouch = pouch
    this.events = events
    this.checksumer = checksumer.init()
    this.pendingChanges = []

    // To detect which files&folders have been removed since the last run of
    // cozy-desktop, we keep all the paths seen by chokidar during its
    // initial scan in @paths to compare them with pouchdb database.
    this.initialScanParams = {
      paths: [],
      emptyDirRetryCount: 3,
      flushed: false,
      done: false
    }

    // XXX: this.onFlush must be bound before being passed to LocalEventBuffer
    autoBind(this)

    // TODO: Read from config
    const timeoutInMs = process.env.NODE_ENV === 'test' ? 2000 : 10000
    this.buffer = new LocalEventBuffer(timeoutInMs, async rawEvents => {
      try {
        await this.onFlush(rawEvents)
      } catch (err) {
        log.error('fatal chokidar watcher error', { err, sentry: true })
        this.fatal(err)
      }
    })
  }

  /** Start chokidar, the filesystem watcher
   *
   * @see https://github.com/paulmillr/chokidar
   */
  start() {
    log.info('Starting watcher...')

    this.resetInitialScanParams()

    const stopChokidarScanMeasure = measureTime('LocalWatcher#chokidarScan')
    const stopInitialScanMeasure = measureTime('LocalWatcher#initialScan')

    this.watcher = chokidar.watch('.', {
      // Let paths in events be relative to this base path
      cwd: this.syncPath,
      // Ignore our own .system-tmp-cozy-drive directory
      ignored: /(^|[\/\\])\.system-tmp-cozy-drive/, // eslint-disable-line no-useless-escape
      // Don't follow symlinks
      followSymlinks: false,
      // The stats object is used in methods below
      alwaysStat: true,
      // Watching on Windows seems to lock dirs with subdirs, preventing them
      // from being renamed/moved/deleted.
      usePolling: process.platform === 'win32',
      // Filter out artifacts from editors with atomic writes
      atomic: true,
      // Poll newly created files to detect when the write is finished
      awaitWriteFinish: {
        pollInterval: 200,
        stabilityThreshold: 1000
      },
      // With node 0.10 on linux, only polling is available
      interval: 1000,
      binaryInterval: 2000
    })

    const started = new Promise(resolve => {
      this.initialScanParams.resolve = () => {
        stopInitialScanMeasure()
        resolve()
      }

      for (let eventType of [
        'add',
        'addDir',
        'change',
        'unlink',
        'unlinkDir'
      ]) {
        this.watcher.on(
          eventType,
          (path /*: ?string */, stats /*: ?fs.Stats */) => {
            const isInitialScan = !this.initialScanParams.flushed
            log.chokidar.debug(eventType, { path, stats, isInitialScan })
            const newEvent = chokidarEvent.build(eventType, path, stats)
            if (newEvent.type !== eventType) {
              log.debug('fixed wrong fsevents event type', {
                eventType,
                event: newEvent
              })
            }
            this.buffer.push(newEvent)
            this.events.emit('buffering-start')
          }
        )
      }

      this.watcher
        .on('ready', () => {
          stopChokidarScanMeasure()
          log.info('Folder scan done')
          this.buffer.switchMode('timeout')
        })
        .on('raw', async (event, path, details) => {
          log.chokidar.trace('raw', { event, path, details })

          if (isRescanFlag(details.flags)) {
            try {
              await this.stop(true)
              await this.start()
            } catch (err) {
              this.fatal(err)
            }
          }
        })
        .on('error', err => {
          if (err.message === 'watch ENOSPC') {
            log.error(
              'Sorry, the kernel is out of inotify watches! ' +
                'See doc/usage/inotify.md for how to solve this issue.',
              { err, sentry: true }
            )
          } else {
            log.error('could not start chokidar watcher', { err, sentry: true })
          }
          this.fatal(err)
        })

      log.info(`Now watching ${this.syncPath}`)
    })

    return started
  }

  // TODO: Start checksuming as soon as an add/change event is buffered
  // TODO: Put flushed event batches in a queue
  async onFlush(rawEvents /*: ChokidarEvent[] */) {
    log.info(`Flushed ${rawEvents.length} events`)

    this.events.emit('buffering-end')
    syncDir.ensureExistsSync(this)
    this.events.emit('local-start')

    let events = rawEvents.filter(hasPath) // @TODO handle root dir events

    if (!this.initialScanParams.flushed) {
      events = await initialScan.step(events, this)
    }

    if (events.length === 0) {
      this.events.emit('local-end')
      this.endInitialScan()
      return
    }

    log.trace('Prepare events...')
    const stopPrepareEventsMeasure = measureTime('LocalWatcher#prepareEvents')
    const preparedEvents /*: LocalEvent[] */ = await prepareEvents.step(
      events,
      this
    )
    stopPrepareEventsMeasure()
    log.trace('Done with events preparation.')

    const stopAnalysisMeasure = measureTime('LocalWatcher#analysis')
    const changes /*: LocalChange[] */ = analysis(preparedEvents, this)
    stopAnalysisMeasure()

    const stopNormalizePathsMeasure = measureTime('LocalWatcher#normalizePaths')
    const normalizedChanges /*: LocalChange[] */ = await normalizePaths.step(
      changes,
      this
    )
    stopNormalizePathsMeasure()

    // TODO: Don't even acquire lock changes list is empty
    // FIXME: Shouldn't we acquire the lock before preparing the events?
    const release = await this.pouch.lock(this)
    let target = -1
    try {
      const stopPrepAndMergeMeasure = measureTime('LocalWatcher#sendToPrep')
      await sendToPrep.step(normalizedChanges, this)
      stopPrepAndMergeMeasure()
      target = (await this.pouch.db.changes({ limit: 1, descending: true }))
        .last_seq
    } finally {
      this.events.emit('sync-target', target)
      release()
      this.events.emit('local-end')
    }

    this.endInitialScan()
  }

  async stop(force /*: ?bool */ = false) {
    log.info('Stopping watcher...')

    if (!this.watcher) return

    if (force || !this.initialScanParams.flushed) {
      // Drop buffered events
      this.buffer.clear()
    } else {
      // XXX manually fire events for added file, because chokidar will cancel
      // them if they are still in the awaitWriteFinish period
      for (let relpath in this.watcher._pendingWrites) {
        try {
          const fullpath = path.join(this.watcher.options.cwd, relpath)
          const curStat = await stater.stat(fullpath)
          this.watcher.emit('add', relpath, curStat)
        } catch (err) {
          log.warn('Could not fire remaining add events', { err })
        }
      }
    }

    // Stop underlying Chokidar watcher
    await this.watcher.close()
    this.watcher = null
    // Flush buffer and stop flushes loop
    this.buffer.switchMode('idle')

    if (!force) {
      // Give some time for awaitWriteFinish events to be managed
      return new Promise(resolve => {
        setTimeout(resolve, 1000)
      })
    }
  }

  resetInitialScanParams() {
    this.initialScanParams = {
      paths: [],
      emptyDirRetryCount: 3,
      flushed: false,
      done: false
    }
  }

  endInitialScan() {
    if (this.initialScanParams.resolve) {
      this.initialScanParams.done = true
      this.initialScanParams.resolve()
    }
  }

  /* Helpers */
  async checksum(filePath /*: string */) /*: Promise<string> */ {
    const absPath = path.join(this.syncPath, filePath)
    return this.checksumer.push(absPath)
  }

  onFatal(listener /*: Error => any */) /*: void */ {
    this.events.on(LOCAL_WATCHER_FATAL_EVENT, listener)
  }

  fatal(err /*: Error */) /*: void */ {
    log.fatal(`Local watcher fatal: ${err.message}`, { err, sentry: true })
    this.events.emit(LOCAL_WATCHER_FATAL_EVENT, err)
    this.events.removeAllListeners(LOCAL_WATCHER_FATAL_EVENT)
    this.stop()
  }
}

module.exports = LocalWatcher