cozy-labs/cozy-desktop

View on GitHub
core/local/channel_watcher/incomplete_fixer.js

Summary

Maintainability
B
4 hrs
Test Coverage
/** Try to fix incomplete events from previous steps.
 *
 * When a file is added or updated, and it is moved just after, the first
 * event is marked as incomplete by addChecksum because we cannot compute the
 * checksum at the given path. But the event is still relevant, in particular
 * if a directory that is an ancestor of this file has been moved. With the
 * renamed event, by comparing the path, we can extrapolate the new path and
 * check with fs.stats if we have a file here.
 *
 * Cf test/property/local_watcher/swedish_krona.json
 *
 * @module core/local/channel_watcher/incomplete_fixer
 * @flow
 */

const path = require('path')

const stater = require('../stater')
const logger = require('../../utils/logger')

const STEP_NAME = 'incompleteFixer'

const log = logger({
  component: `ChannelWatcher/${STEP_NAME}`
})

/** Drop incomplete events after this delay (in milliseconds).
 *
 * TODO: tweak the value (the initial value was chosen because it looks like a
 * good value, it is not something that was computed).
 */
const DELAY = 3000

/*::
import type Channel from './channel'
import type { ChannelEvent, ChannelBatch } from './event'
import type { Checksumer } from '../checksumer'
import type { Config } from '../../config'
import type { Pouch } from '../../pouch'
import type { Metadata } from '../../metadata'

type IncompleteItem = {
  event: ChannelEvent,
  timestamp: number,
}

type IncompleteFixerOptions = {
  config: Config,
  checksumer: Checksumer,
  pouch: Pouch,
  fatal: Error => any,
}

type Completion =
  | {| rebuilt: ChannelEvent |}
  | {| ignored: true |}
*/

module.exports = {
  loop,
  step
}

function wasRenamedSuccessively(
  previousEvent /*: ChannelEvent */,
  nextEvent /*: ChannelEvent */
) /*: boolean %checks */ {
  return (
    nextEvent.oldPath != null &&
    (previousEvent.path + path.sep).startsWith(nextEvent.oldPath + path.sep)
  )
}

function itemDestinationWasDeleted(
  previousEvent /*: ChannelEvent */,
  nextEvent /*: ChannelEvent */
) /*: boolean %checks */ {
  return !!(
    nextEvent.action === 'deleted' &&
    previousEvent.oldPath &&
    (previousEvent.path + path.sep).startsWith(nextEvent.path + path.sep)
  )
}

function renamedItemWasReplaced(
  previousEvent /*: ChannelEvent */,
  nextEvent /*: ChannelEvent */
) /*: boolean %checks */ {
  return !!(
    nextEvent.action === 'created' &&
    previousEvent.oldPath &&
    nextEvent.path === previousEvent.oldPath &&
    nextEvent.kind === previousEvent.kind
  )
}

function completeEventPaths(
  previousEvent /*: ChannelEvent */,
  nextEvent /*: ChannelEvent */
) /*: { path: string, oldPath?: string } */ {
  // $FlowFixMe: `renamed` events always have oldPath
  const path = previousEvent.path.replace(nextEvent.oldPath, nextEvent.path)

  if (previousEvent.oldPath) {
    return {
      path,
      oldPath:
        path === nextEvent.path
          ? previousEvent.oldPath
          : // $FlowFixMe: `renamed` events always have oldPath
            previousEvent.oldPath.replace(nextEvent.oldPath, nextEvent.path)
    }
  } else {
    return { path }
  }
}

async function rebuildIncompleteEvent(
  previousEvent /*: ChannelEvent */,
  nextEvent /*: ChannelEvent */,
  opts /*: { config: Config , checksumer: Checksumer, pouch: Pouch } */
) /*: Promise<Completion> */ {
  const { path: rebuiltPath, oldPath: rebuiltOldPath } = completeEventPaths(
    previousEvent,
    nextEvent
  )

  if (rebuiltPath === rebuiltOldPath) {
    return { ignored: true }
  }

  const absPath = path.join(opts.config.syncPath, rebuiltPath)
  const stats = await stater.statMaybe(absPath)
  const incomplete = stats == null
  const kind = stats ? stater.kind(stats) : previousEvent.kind
  const md5sum =
    stats && kind === 'file' ? await opts.checksumer.push(absPath) : undefined

  const rebuilt /*: ChannelEvent */ = {
    [STEP_NAME]: {
      incompleteEvent: previousEvent,
      completingEvent: nextEvent
    },
    action: previousEvent.action,
    path: rebuiltPath,
    kind,
    md5sum
  }
  if (rebuiltOldPath) rebuilt.oldPath = rebuiltOldPath
  if (stats) rebuilt.stats = stats
  if (incomplete) rebuilt.incomplete = incomplete

  return { rebuilt }
}

function buildDeletedFromRenamed(
  previousEvent /*: ChannelEvent */,
  nextEvent /*: ChannelEvent */
) /*: Completion */ {
  const { oldPath, kind } = previousEvent
  return {
    rebuilt: {
      [STEP_NAME]: {
        incompleteEvent: previousEvent,
        completingEvent: nextEvent
      },
      action: nextEvent.action,
      // $FlowFixMe: renamed events always have an oldPath
      path: oldPath,
      kind
    }
  }
}

function buildModifiedFromRenamed(
  previousEvent /*: ChannelEvent */,
  nextEvent /*: ChannelEvent */
) /*: Completion */ {
  return {
    rebuilt: {
      ...nextEvent,
      action: 'modified',
      [STEP_NAME]: {
        incompleteEvent: previousEvent,
        completingEvent: nextEvent
      }
    }
  }
}

function keepEvent(event, { incompletes, batch }) {
  if (event.incomplete) {
    incompletes.add({ event, timestamp: Date.now() })
  } else {
    batch.add(event)
  }
}

function loop(
  channel /*: Channel */,
  opts /*: IncompleteFixerOptions */
) /*: Channel */ {
  const incompletes = []
  return channel.asyncMap(step({ incompletes }, opts), opts.fatal)
}

function step(
  state /*: { incompletes: IncompleteItem[] } */,
  opts /*: IncompleteFixerOptions */
) {
  return async (events /*: ChannelBatch */) /*: Promise<ChannelBatch> */ => {
    const batch = new Set()

    // Filter incomplete events
    for (const event of events) {
      if (event.incomplete && event.action !== 'ignored') {
        log.debug({ path: event.path, action: event.action }, 'incomplete')
        state.incompletes.push({ event, timestamp: Date.now() })
      }
    }

    // Let's see if we can match an incomplete event with a renamed or deleted event
    for (const event of events) {
      const now = Date.now()
      for (let i = 0; i < state.incompletes.length; i++) {
        const item = state.incompletes[i]

        // Remove the expired incomplete events
        if (item.timestamp + DELAY < now) {
          log.debug(
            { path: item.event.path, event: item.event },
            'Dropping expired incomplete event'
          )
          state.incompletes.splice(i, 1)
          i--
        }
      }

      if (
        state.incompletes.length === 0 ||
        !['renamed', 'deleted', 'created'].includes(event.action)
      ) {
        if (!event.incomplete) {
          batch.add(event)
        }
        continue
      }

      const incompletes = new Set()
      for (let i = 0; i < state.incompletes.length; i++) {
        const item = state.incompletes[i]

        try {
          const completion = await detectCompletion(item.event, event, opts)
          if (!completion) {
            incompletes.add(item)
            if (!event.incomplete) {
              batch.add(event)
            }
            continue
          } else if (completion.ignored) {
            incompletes.add(item)
            continue
          }

          const { rebuilt } = completion
          log.debug({ path: event.path, rebuilt }, 'rebuilt event')

          // If the incomplete event is for a document that was previously saved
          // (e.g. a temporary document now renamed), we'll want to make sure the old
          // document is removed to avoid having 2 documents with the same inode.
          // We can do this by keeping the completing renamed event.
          const incompleteForExistingDoc /*: ?Metadata */ =
            await opts.pouch.byLocalPath(item.event.path)
          if (
            incompleteForExistingDoc &&
            !incompleteForExistingDoc.trashed &&
            (item.event.action === 'created' || item.event.action === 'scan')
          ) {
            // Simply drop the incomplete event since we already have a document at this
            // path in Pouch.
            keepEvent(event, { incompletes, batch })
          } else if (
            incompleteForExistingDoc &&
            !incompleteForExistingDoc.trashed &&
            item.event.action === 'modified'
          ) {
            // Keep the completed modified event to make sure we don't miss any
            // content changes but process the completing event (i.e. probably a
            // renamed event) to make sure we apply the modifications on the right
            // document.
            keepEvent(event, { incompletes, batch })
            keepEvent(rebuilt, { incompletes, batch })
          } else if (rebuilt.path.startsWith(event.path + path.sep)) {
            // Keep the completing event if it's the parent of the completed event since
            // they don't apply to the same document.
            keepEvent(event, { incompletes, batch })
            keepEvent(rebuilt, { incompletes, batch })
          } else {
            keepEvent(rebuilt, { incompletes, batch })
          }

          if (rebuilt.incomplete) {
            // The rebuilt being incomplete, we'll keep it in the list of
            // incompletes and should replace the existing incomplete event
            // since it was still rebuilt.
            state.incompletes.splice(i, 1)
          }
        } catch (err) {
          log.warn(
            { err, event, item },
            'Error while rebuilding incomplete event'
          )
          // If we have an error, there is probably not much that we can do
        }
      }
      // Replace the state's incompletes with the newly built set.
      state.incompletes.length = 0
      state.incompletes.push(...incompletes)
    }

    return Array.from(batch)
  }
}

async function detectCompletion(
  previousEvent /*: ChannelEvent */,
  nextEvent /*: ChannelEvent */,
  opts /*: IncompleteFixerOptions */
) /*: Promise<?Completion> */ {
  if (wasRenamedSuccessively(previousEvent, nextEvent)) {
    // We have a match, try to rebuild the incomplete event
    return rebuildIncompleteEvent(previousEvent, nextEvent, opts)
  } else if (itemDestinationWasDeleted(previousEvent, nextEvent)) {
    // We have a match, try to replace the incomplete event
    return buildDeletedFromRenamed(previousEvent, nextEvent)
  } else if (renamedItemWasReplaced(previousEvent, nextEvent)) {
    return buildModifiedFromRenamed(previousEvent, nextEvent)
  }
}