cozy-labs/cozy-desktop

View on GitHub
core/local/channel_watcher/overwrite.js

Summary

Maintainability
A
2 hrs
Test Coverage
/** This step handles file additions/moves overwriting their destination.
 *
 * @module core/local/channel_watcher/overwrite
 * @flow
 */

const _ = require('lodash')

const Channel = require('./channel')
const { logger } = require('../../utils/logger')
const { measureTime } = require('../../utils/perfs')

/*::
import type { ChannelEvent, ChannelBatch } from './event'
import type { Metadata } from '../../metadata'

type PendingBatch = {
  deletedEventsByPath: Map<string, ChannelEvent>,
  events: ChannelBatch,
  timeout?: TimeoutID
}

type OverwriteState = {
  deletedEventsByPath: Map<string, ChannelEvent>,
  pendingBatches: PendingBatch[]
}

type OverwriteOptions = {
  state: {
    [typeof STEP_NAME]: OverwriteState
  }
}
*/

const STEP_NAME = 'overwrite'

const log = logger({
  component: `ChannelWatcher/${STEP_NAME}`
})

/**
 * Wait at most this delay (in milliseconds) to fix overwriting move related
 * events.
 */
const DELAY = 500

const initialState = () => ({
  [STEP_NAME]: {
    // eslint-disable-next-line
    deletedEventsByPath: new Map /*:: <string,ChannelEvent> */ (),
    pendingBatches: []
  }
})

/** Current batch becomes pending.
 *
 * So we can:
 * - Detect overwriting move events in the next one.
 * - Possibly aggregate deleted events from the current (now pending) one.
 */
const rotateState = (state, events, output) => {
  const pending /*: Object */ = {
    deletedEventsByPath: state.deletedEventsByPath,
    events
  }
  pending.timeout = setTimeout(() => {
    output(state)
  }, DELAY)
  state.pendingBatches.push((pending /*: PendingBatch */))

  state.deletedEventsByPath = new Map()
}

/** Deleted event ids match moved ones. */
const indexDeletedEvent = (event, state) => {
  if (event.action === 'deleted') {
    state.deletedEventsByPath.set(event.path, event)
  }
}

const findDeletedEvent = (path, state) => {
  if (state.deletedEventsByPath.has(path)) {
    return state.deletedEventsByPath.get(path)
  } else {
    for (const pending of state.pendingBatches) {
      if (pending.deletedEventsByPath.has(path)) {
        return pending.deletedEventsByPath.get(path)
      }
    }
  }
}

/** Possibly change event action to 'ignored'.
 *
 * In case it is a deleted event related to an overwriting move.
 */
const ignoreDeletedBeforeOverwritingMove = (event, state) => {
  const { path } = event
  const pendingDeletedEvent = findDeletedEvent(path, state)
  if (pendingDeletedEvent) {
    const deletedClone = _.clone(pendingDeletedEvent)
    const renamedClone = _.clone(event)

    event.overwrite = true
    _.set(event, [STEP_NAME, 'moveToDeletedPath'], deletedClone)
    pendingDeletedEvent.action = 'ignored'
    _.set(
      pendingDeletedEvent,
      [STEP_NAME, 'deletedBeforeRenamed'],
      renamedClone
    )
  }
}

/** Possibly change event action to 'ignored'.
 *
 * In case it is a deleted event preceding a replacement.
 * We expect the Merge step to be able to merge the created event as is, even
 * without deleting the document first.
 * However, we want to ignore the deleted event so we don't move the original
 * file to the trash.
 */
const ignoreDeletedBeforeOverwritingAdd = (event, state) => {
  const { path } = event
  const pendingDeletedEvent = findDeletedEvent(path, state)
  if (pendingDeletedEvent) {
    const deletedClone = _.clone(pendingDeletedEvent)
    const createdClone = _.clone(event)

    _.set(event, [STEP_NAME, 'createOnDeletedPath'], deletedClone)
    pendingDeletedEvent.action = 'ignored'
    _.set(pendingDeletedEvent, [STEP_NAME, 'deletedBeforeCreate'], createdClone)
  }
}

/** Process an event batch. */
const step = async (
  batch /*: ChannelBatch */,
  opts /*: OverwriteOptions */
) => {
  const {
    state: { [STEP_NAME]: state }
  } = opts

  for (const event of batch) {
    indexDeletedEvent(event, state)

    if (event.action === 'renamed') {
      ignoreDeletedBeforeOverwritingMove(event, state)
    } else if (event.action === 'created') {
      ignoreDeletedBeforeOverwritingAdd(event, state)
    }
  }
}

const _loop = async (channel, out, opts) => {
  const output = state => {
    const pending = state.pendingBatches.shift()
    if (pending) out.push(pending.events)
  }

  // eslint-disable-next-line no-constant-condition
  while (true) {
    const events = await channel.pop()
    const stopMeasure = measureTime('LocalWatcher#overwriteStep')

    const {
      state: { [STEP_NAME]: state }
    } = opts

    await step(events, opts)

    rotateState(state, events, output)
    stopMeasure()
  }
}

const loop = (channel /*: Channel */, opts /*: OverwriteOptions */) => {
  const out = new Channel()

  _loop(channel, out, opts).catch(err => {
    log.error({ err })
  })

  return out
}

module.exports = {
  STEP_NAME,
  initialState,
  loop,
  step
}