remotestorage/remotestorage.js

View on GitHub
src/sync.ts

Summary

Maintainability
F
6 days
Test Coverage
import type { RSNode, RSNodes } from './interfaces/rs_node';
import config from './config';
import Env from './env';
import EventHandling from './eventhandling';
import log from './log';
import Authorize from './authorize';
import SyncError from './sync-error';
import UnauthorizedError from './unauthorized-error';
import {
  applyMixins,
  deepClone,
  equal,
  isFolder,
  isDocument,
  pathsFromRoot
} from './util';

let syncCycleCb, syncOnConnect;

interface ResponseStatus {
  statusCode: string | number;
  successful: boolean | undefined;
  conflict: boolean | undefined;
  unAuth: boolean | undefined;
  notFound: boolean | undefined;
  changed: boolean | undefined;
  networkProblems: boolean | undefined;
}

interface SyncTask {
  action: any;
  path: string;
  promise: Promise<any>;
}

function taskFor (action, path: string, promise: Promise<any>): SyncTask {
  return { action, path, promise };
}

function nodeChanged (node: RSNode, etag: string): boolean {
  return node.common.revision !== etag &&
         (!node.remote || node.remote.revision !== etag);
}

function isStaleChild (node: RSNode): boolean {
  return node.remote && node.remote.revision && !node.remote.itemsMap && !node.remote.body;
}

function hasCommonRevision (node: RSNode): boolean {
  return node.common && node.common.revision;
}

function hasNoRemoteChanges (node: RSNode): boolean {
  if (node.remote && node.remote.revision &&
      node.remote.revision !== node.common.revision) {
    return false;
  }
  return (node.common.body === undefined && node.remote.body === false) ||
    (node.remote.body === node.common.body &&
     node.remote.contentType === node.common.contentType);
}

function mergeMutualDeletion (node: RSNode): RSNode {
  if (node.remote && node.remote.body === false &&
      node.local && node.local.body === false) {
    delete node.local;
  }
  return node;
}

function handleVisibility (env, rs): void {
  function handleChange (isForeground): void {
    const oldValue = rs.getCurrentSyncInterval();
    config.isBackground = !isForeground;
    const newValue = rs.getCurrentSyncInterval();
    rs._emit('sync-interval-change', {oldValue: oldValue, newValue: newValue});
  }
  env.on('background', () => handleChange(false));
  env.on('foreground', () => handleChange(true));
}

/**
 * Class: RemoteStorage.Sync
 *
 * This class basically does six things:
 *
 * - retrieve the remote version of relevant documents and folders
 * - add all local and remote documents together into one tree
 * - push local documents out if they don't exist remotely
 * - push local changes out to remote documents (conditionally, to avoid race
 *   conditions where both have changed)
 * - adopt the local version of a document to its remote version if both exist
 *   and they differ
 * - delete the local version of a document if it was deleted remotely
 * - if any GET requests were waiting for remote data, resolve them once this
 *   data comes in.
 *
 * It does this using requests to documents and folders. Whenever a folder GET
 * comes in, it gives information about all the documents it contains (this is
 * the `markChildren` function).
 **/
class Sync {
  // TODO remove when RS is defined, or if unnecessary
  rs: { [propName: string]: any };

  numThreads: number;
  done: boolean;
  stopped: boolean;

  // TODO define in more detail
  _tasks: object;
  _running: object;
  _timeStarted: object;
  _finishedTasks: Array<SyncTask> = [];

  constructor (remoteStorage: object) {
    this.rs = remoteStorage;

    this._tasks       = {};
    this._running     = {};
    this._timeStarted = {};

    this.numThreads = 10;

    this.rs.local.onDiff(path => {
      this.addTask(path);
      this.doTasks();
    });

    this.rs.caching.onActivate((path: string): void => {
      this.addTask(path);
      this.doTasks();
    });

    this.addEvents(['done', 'req-done']);
  }

  public now (): number {
    return new Date().getTime();
  }

  public queueGetRequest (path: string): object {
    return new Promise((resolve, reject) => {
      if (!this.rs.remote.connected) {
        reject('cannot fulfill maxAge requirement - remote is not connected');
      } else if (!this.rs.remote.online) {
        reject('cannot fulfill maxAge requirement - remote is not online');
      } else {
        this.addTask(path, function (): void {
          this.rs.local.get(path).then(r => resolve(r));
        }.bind(this));

        this.doTasks();
      }
    });
  }

  // FIXME force02 sounds like rs spec 02, thus could be removed
  public corruptServerItemsMap (itemsMap, force02?: boolean): boolean {
    if ((typeof(itemsMap) !== 'object') || (Array.isArray(itemsMap))) {
      return true;
    }

    for (const itemName in itemsMap) {
      const item = itemsMap[itemName];

      if (typeof(item) !== 'object') {
        return true;
      }
      if (typeof(item.ETag) !== 'string') {
        return true;
      }
      if (isFolder(itemName)) {
        if (itemName.substring(0, itemName.length-1).indexOf('/') !== -1) {
          return true;
        }
      } else {
        if (itemName.indexOf('/') !== -1) {
          return true;
        }
        if (force02) {
          if (typeof(item['Content-Type']) !== 'string') {
            return true;
          }
          if (typeof(item['Content-Length']) !== 'number') {
            return true;
          }
        }
      }
    }

    return false;
  }

  public corruptItemsMap (itemsMap): boolean {
    if ((typeof(itemsMap) !== 'object') || (Array.isArray(itemsMap))) {
      return true;
    }

    for (const path in itemsMap) {
      if (typeof itemsMap[path] !== 'boolean') {
        return true;
      }
    }

    return false;
  }

  public corruptRevision (rev): boolean {
    return ((typeof(rev) !== 'object') ||
            (Array.isArray(rev)) ||
            (rev.revision && typeof(rev.revision) !== 'string') ||
            (rev.body && typeof(rev.body) !== 'string' && typeof(rev.body) !== 'object') ||
            (rev.contentType && typeof(rev.contentType) !== 'string') ||
            (rev.contentLength && typeof(rev.contentLength) !== 'number') ||
            (rev.timestamp && typeof(rev.timestamp) !== 'number') ||
            (rev.itemsMap && this.corruptItemsMap(rev.itemsMap)));
  }

  public isCorrupt (node: RSNode): boolean {
    return ((typeof(node) !== 'object') ||
            (Array.isArray(node)) ||
            (typeof(node.path) !== 'string') ||
            (this.corruptRevision(node.common)) ||
            (node.local && this.corruptRevision(node.local)) ||
            (node.remote && this.corruptRevision(node.remote)) ||
            (node.push && this.corruptRevision(node.push)));
  }

  public hasTasks (): boolean {
    return Object.getOwnPropertyNames(this._tasks).length > 0;
  }

  public async collectDiffTasks (): Promise<number> {
    let num = 0;

    return this.rs.local.forAllNodes((node: RSNode) => {
      if (num > 100) { return; }

      if (this.isCorrupt(node)) {
        log('[Sync] WARNING: corrupt node in local cache', node);
        if (typeof(node) === 'object' && node.path) {
          this.addTask(node.path);
          num++;
        }
      } else if (this.needsFetch(node) && this.rs.access.checkPathPermission(node.path, 'r')) {
        this.addTask(node.path);
        num++;
      } else if (isDocument(node.path) && this.needsPush(node) &&
                 this.rs.access.checkPathPermission(node.path, 'rw')) {
        this.addTask(node.path);
        num++;
      }
    })
    .then((): number => num)
    .catch(e => { throw e; });
  }

  public inConflict (node: RSNode): boolean {
    return (node.local && node.remote &&
            (node.remote.body !== undefined || node.remote.itemsMap));
  }

  public needsRefresh (node: RSNode): boolean {
    if (node.common) {
      if (!node.common.timestamp) {
        return true;
      }
      return (this.now() - node.common.timestamp > config.syncInterval);
    }
    return false;
  }

  public needsFetch (node: RSNode): boolean {
    if (this.inConflict(node)) {
      return true;
    }
    if (node.common &&
        node.common.itemsMap === undefined &&
        node.common.body === undefined) {
      return true;
    }
    if (node.remote &&
        node.remote.itemsMap === undefined &&
        node.remote.body === undefined) {
      return true;
    }
    return false;
  }

  public needsPush (node: RSNode): boolean {
    if (this.inConflict(node)) {
      return false;
    }
    if (node.local && !node.push) {
      return true;
    }
  }

  public needsRemotePut (node: RSNode): boolean {
    return node.local && node.local.body;
  }

  public needsRemoteDelete (node: RSNode): boolean {
    return node.local && node.local.body === false;
  }

  public getParentPath (path: string): string {
    const parts = path.match(/^(.*\/)([^\/]+\/?)$/);

    if (parts) {
      return parts[1];
    } else {
      throw new Error('Not a valid path: "'+path+'"');
    }
  }

  public deleteChildPathsFromTasks (): void {
    for (const path in this._tasks) {
      const paths = pathsFromRoot(path);

      for (let i=1; i<paths.length; i++) {
        if (this._tasks[paths[i]]) {
          // move pending promises to parent task
          if (Array.isArray(this._tasks[path]) && this._tasks[path].length) {
            Array.prototype.push.apply(
              this._tasks[paths[i]],
              this._tasks[path]
            );
          }
          delete this._tasks[path];
        }
      }
    }
  }

  public async collectRefreshTasks (): Promise<void> {
    return this.rs.local.forAllNodes((node: RSNode) => {
      let parentPath: string;
      if (this.needsRefresh(node)) {
        try {
          parentPath = this.getParentPath(node.path);
        } catch(e) {
          // node.path is already '/', can't take parentPath
        }
        if (parentPath && this.rs.access.checkPathPermission(parentPath, 'r')) {
          this.addTask(parentPath);
        } else if (this.rs.access.checkPathPermission(node.path, 'r')) {
          this.addTask(node.path);
        }
      }
    })
    .then(() => this.deleteChildPathsFromTasks())
    .catch((e: Error) => { throw e; });
  }

  public flush (nodes: RSNodes): RSNodes {
    for (const path in nodes) {
      // Strategy is 'FLUSH' and no local changes exist
      if (this.rs.caching.checkPath(path) === 'FLUSH' &&
          nodes[path] && !nodes[path].local) {
        log('[Sync] Flushing', path);
        nodes[path] = undefined; // Cause node to be flushed from cache
      }
    }
    return nodes;
  }

  public doTask (path: string): object {
    return this.rs.local.getNodes([path]).then((nodes: RSNodes) => {
      const node = nodes[path];
      // First fetch:
      if (typeof(node) === 'undefined') {
        return taskFor('get', path, this.rs.remote.get(path));
      }
      // Fetch known-stale child:
      else if (isStaleChild(node)) {
        return taskFor('get', path, this.rs.remote.get(path));
      }
      // Push PUT:
      else if (this.needsRemotePut(node)) {
        node.push = deepClone(node.local);
        node.push.timestamp = this.now();

        return this.rs.local.setNodes(this.flush(nodes)).then(() => {
          let options;
          if (hasCommonRevision(node)) {
            options = { ifMatch: node.common.revision };
          } else {
            // Initial PUT (fail if something is already there)
            options = { ifNoneMatch: '*' };
          }

          return taskFor('put', path,
            this.rs.remote.put(path, node.push.body, node.push.contentType, options)
          );
        });
      }
      // Push DELETE:
      else if (this.needsRemoteDelete(node)) {
        node.push = { body: false, timestamp: this.now() };

        return this.rs.local.setNodes(this.flush(nodes)).then(() => {
          if (hasCommonRevision(node)) {
            return taskFor('delete', path,
              this.rs.remote.delete(path, { ifMatch: node.common.revision })
            );
          } else { // Ascertain current common or remote revision first
            return taskFor('get', path, this.rs.remote.get(path));
          }
        });
      }
      // Conditional refresh:
      else if (hasCommonRevision(node)) {
        return taskFor('get', path,
          this.rs.remote.get(path, { ifNoneMatch: node.common.revision })
        );
      }
      else {
        return taskFor('get', path, this.rs.remote.get(path));
      }
    });
  }

  public autoMergeFolder (node: RSNode): RSNode {
    if (node.remote.itemsMap) {
      node.common = node.remote;
      delete node.remote;

      if (node.common.itemsMap) {
        for (const itemName in node.common.itemsMap) {
          if (!node.local.itemsMap[itemName]) {
            // Indicates the node is either newly being fetched
            // has been deleted locally (whether or not leading to conflict);
            // before listing it in local listings, check if a local deletion
            // exists.
            node.local.itemsMap[itemName] = false;
          }
        }

        if (equal(node.local.itemsMap, node.common.itemsMap)) {
          delete node.local;
        }
      }
    }
    return node;
  }

  public autoMergeDocument (node: RSNode): RSNode {
    if (hasNoRemoteChanges(node)) {
      node = mergeMutualDeletion(node);
      delete node.remote;
    } else if (node.remote.body !== undefined) {
      // keep/revert:
      log('[Sync] Emitting keep/revert');

      this.rs.local._emitChange({
        origin:         'conflict',
        path:           node.path,
        oldValue:       node.local.body,
        newValue:       node.remote.body,
        lastCommonValue: node.common.body,
        oldContentType: node.local.contentType,
        newContentType: node.remote.contentType,
        lastCommonContentType: node.common.contentType
      });

      if (node.remote.body) {
        node.common = node.remote;
      } else {
        node.common = {};
      }
      delete node.remote;
      delete node.local;
    }

    return node;
  }

  public autoMerge (node: RSNode): RSNode {
    if (node.remote) {
      if (node.local) {
        if (isFolder(node.path)) {
          return this.autoMergeFolder(node);
        } else {
          return this.autoMergeDocument(node);
        }
      } else { // no local changes
        if (isFolder(node.path)) {
          if (node.remote.itemsMap !== undefined) {
            node.common = node.remote;
            delete node.remote;
          }
        } else {
          if (node.remote.body !== undefined) {
            const change = {
              origin:   'remote',
              path:     node.path,
              oldValue: (node.common.body === false ? undefined : node.common.body),
              newValue: (node.remote.body === false ? undefined : node.remote.body),
              oldContentType: node.common.contentType,
              newContentType: node.remote.contentType
            };
            if (change.oldValue || change.newValue) {
              this.rs.local._emitChange(change);
            }

            if (!node.remote.body) { // no remote, so delete/don't create
              return;
            }

            node.common = node.remote;
            delete node.remote;
          }
        }
      }
    } else {
      if (node.common.body) {
        this.rs.local._emitChange({
          origin:   'remote',
          path:     node.path,
          oldValue: node.common.body,
          newValue: undefined,
          oldContentType: node.common.contentType,
          newContentType: undefined
        });
      }

      return undefined;
    }

    return node;
  }

  public async updateCommonTimestamp (path: string, revision: string): Promise<void> {
    return this.rs.local.getNodes([path]).then((nodes: RSNodes) => {
      if (nodes[path] &&
          nodes[path].common &&
          nodes[path].common.revision === revision) {
        nodes[path].common.timestamp = this.now();
      }
      return this.rs.local.setNodes(this.flush(nodes));
    });
  }

  public async markChildren (path, itemsMap, changedNodes: RSNodes, missingChildren): Promise<void> {
    const paths = [];
    const meta = {};
    const recurse = {};

    for (const item in itemsMap) {
      paths.push(path+item);
      meta[path+item] = itemsMap[item];
    }
    for (const childName in missingChildren) {
      paths.push(path+childName);
    }

    return this.rs.local.getNodes(paths).then((nodes: RSNodes) => {
      let cachingStrategy;
      let node;

      for (const nodePath in nodes) {
        node = nodes[nodePath];

        if (meta[nodePath]) {
          if (node && node.common) {
            if (nodeChanged(node, meta[nodePath].ETag)) {
              changedNodes[nodePath] = deepClone(node);
              changedNodes[nodePath].remote = {
                revision:  meta[nodePath].ETag,
                timestamp: this.now()
              };
              changedNodes[nodePath] = this.autoMerge(changedNodes[nodePath]);
            }
          } else {
            cachingStrategy = this.rs.caching.checkPath(nodePath);
            if (cachingStrategy === 'ALL') {
              changedNodes[nodePath] = {
                path: nodePath,
                common: {
                  timestamp: this.now()
                },
                remote: {
                  revision: meta[nodePath].ETag,
                  timestamp: this.now()
                }
              };
            }
          }

          if (changedNodes[nodePath] && meta[nodePath]['Content-Type']) {
            changedNodes[nodePath].remote.contentType = meta[nodePath]['Content-Type'];
          }

          if (changedNodes[nodePath] && meta[nodePath]['Content-Length']) {
            changedNodes[nodePath].remote.contentLength = meta[nodePath]['Content-Length'];
          }
        } else if (missingChildren[nodePath.substring(path.length)] && node && node.common) {
          if (node.common.itemsMap) {
            for (const commonItem in node.common.itemsMap) {
              recurse[nodePath+commonItem] = true;
            }
          }

          if (node.local && node.local.itemsMap) {
            for (const localItem in node.local.itemsMap) {
              recurse[nodePath+localItem] = true;
            }
          }

          if (node.remote || isFolder(nodePath)) {
            changedNodes[nodePath] = undefined;
          } else {
            changedNodes[nodePath] = this.autoMerge(node);

            if (typeof changedNodes[nodePath] === 'undefined') {
              const parentPath = this.getParentPath(nodePath);
              const parentNode = changedNodes[parentPath];
              const itemName = nodePath.substring(path.length);
              if (parentNode && parentNode.local) {
                delete parentNode.local.itemsMap[itemName];

                if (equal(parentNode.local.itemsMap, parentNode.common.itemsMap)) {
                  delete parentNode.local;
                }
              }
            }
          }
        }
      }

      return this.deleteRemoteTrees(Object.keys(recurse), changedNodes)
        .then(changedObjs2 => {
          return this.rs.local.setNodes(this.flush(changedObjs2));
        });
    });
  }

  public async deleteRemoteTrees (paths: Array<string>, changedNodes: RSNodes): Promise<RSNodes> {
    if (paths.length === 0) {
      return Promise.resolve(changedNodes);
    }

    return this.rs.local.getNodes(paths).then(async (nodes: RSNodes) => {
      const subPaths = {};

      function collectSubPaths (folder, path: string): void {
        if (folder && folder.itemsMap) {
          for (const itemName in folder.itemsMap) {
            subPaths[path+itemName] = true;
          }
        }
      }

      for (const path in nodes) {
        const node = nodes[path];

        // TODO Why check for the node here? I don't think this check ever applies
        if (!node) { continue; }

        if (isFolder(path)) {
          collectSubPaths(node.common, path);
          collectSubPaths(node.local, path);
        } else {
          if (node.common && typeof(node.common.body) !== undefined) {
            changedNodes[path] = deepClone(node);
            changedNodes[path].remote = {
              body:      false,
              timestamp: this.now()
            };
            changedNodes[path] = this.autoMerge(changedNodes[path]);
          }
        }
      }

      // Recurse whole tree depth levels at once:
      return this.deleteRemoteTrees(Object.keys(subPaths), changedNodes)
        .then(changedNodes2 => {
          return this.rs.local.setNodes(this.flush(changedNodes2));
        });
    });
  }

  public async completeFetch (path: string, bodyOrItemsMap: object, contentType: string, revision: string): Promise<any> {
    let paths: Array<string>;
    let parentPath: string;
    const pathsFromRootArr = pathsFromRoot(path);

    if (isFolder(path)) {
      paths = [path];
    } else {
      parentPath = pathsFromRootArr[1];
      paths = [path, parentPath];
    }

    return this.rs.local.getNodes(paths).then((nodes: RSNodes) => {
      let itemName: string;
      let node: RSNode = nodes[path];
      let parentNode: RSNode;
      const missingChildren = {};

      function collectMissingChildren (folder): void {
        if (folder && folder.itemsMap) {
          for (itemName in folder.itemsMap) {
            if (!bodyOrItemsMap[itemName]) {
              missingChildren[itemName] = true;
            }
          }
        }
      }

      if (typeof(node) !== 'object'  ||
          node.path !== path ||
          typeof(node.common) !== 'object') {
        node = { path: path, common: {} };
        nodes[path] = node;
      }

      node.remote = {
        revision: revision,
        timestamp: this.now()
      };

      if (isFolder(path)) {
        collectMissingChildren(node.common);
        collectMissingChildren(node.remote);

        node.remote.itemsMap = {};
        for (itemName in bodyOrItemsMap) {
          node.remote.itemsMap[itemName] = true;
        }
      } else {
        node.remote.body = bodyOrItemsMap;
        node.remote.contentType = contentType;

        parentNode = nodes[parentPath];
        if (parentNode && parentNode.local && parentNode.local.itemsMap) {
          itemName = path.substring(parentPath.length);
          parentNode.local.itemsMap[itemName] = true;
          if (equal(parentNode.local.itemsMap, parentNode.common.itemsMap)) {
            delete parentNode.local;
          }
        }
      }

      nodes[path] = this.autoMerge(node);

      return {
        toBeSaved:       nodes,
        missingChildren: missingChildren
      };
    });
  }

  public async completePush (path: string, action, conflict, revision: string): Promise<void> {
    return this.rs.local.getNodes([path]).then((nodes: RSNodes) => {
      const node = nodes[path];

      if (!node.push) {
        this.stopped = true;
        throw new Error('completePush called but no push version!');
      }

      if (conflict) {
        log('[Sync] We have a conflict');

        if (!node.remote || node.remote.revision !== revision) {
          node.remote = {
            revision:  revision || 'conflict',
            timestamp: this.now()
          };
          delete node.push;
        }

        nodes[path] = this.autoMerge(node);
      } else {
        node.common = {
          revision:  revision,
          timestamp: this.now()
        };

        if (action === 'put') {
          node.common.body = node.push.body;
          node.common.contentType = node.push.contentType;

          if (equal(node.local.body, node.push.body) &&
              node.local.contentType === node.push.contentType) {
            delete node.local;
          }

          delete node.push;
        } else if (action === 'delete') {
          if (node.local.body === false) { // No new local changes since push; flush it.
            nodes[path] = undefined;
          } else {
            delete node.push;
          }
        }
      }

      return this.rs.local.setNodes(this.flush(nodes));
    });
  }

  public async dealWithFailure (path: string): Promise<void> {
    return this.rs.local.getNodes([path]).then((nodes: RSNodes) => {
      if (nodes[path]) {
        delete nodes[path].push;
        return this.rs.local.setNodes(this.flush(nodes));
      }
    });
  }

  public interpretStatus (statusCode: string | number): ResponseStatus {
    const status: ResponseStatus = {
      statusCode:      statusCode,
      successful:      undefined,
      conflict:        undefined,
      unAuth:          undefined,
      notFound:        undefined,
      changed:         undefined,
      networkProblems: undefined
    };

    if (typeof statusCode === 'string' &&
        (statusCode === 'offline' || statusCode === 'timeout')) {
      status.successful = false;
      status.networkProblems = true;
      return status;
    } else if (typeof statusCode === 'number') {
      const series = Math.floor(statusCode / 100);

      status.successful = (series === 2 ||
                           statusCode === 304 ||
                           statusCode === 412 ||
                           statusCode === 404),
      status.conflict   = (statusCode === 412);
      status.unAuth     = ((statusCode === 401 && this.rs.remote.token !== Authorize.IMPLIED_FAKE_TOKEN) ||
                           statusCode === 402 ||
                           statusCode === 403);
      status.notFound   = (statusCode === 404);
      status.changed    = (statusCode !== 304);

      return status;
    }
  }

  public async handleGetResponse (path: string, status: ResponseStatus, bodyOrItemsMap, contentType: string, revision: string): Promise<boolean> {
    if (status.notFound) {
      if (isFolder(path)) {
        bodyOrItemsMap = {};
      } else {
        bodyOrItemsMap = false;
      }
    }

    if (status.changed) {
      return this.completeFetch(path, bodyOrItemsMap, contentType, revision)
        .then(dataFromFetch => {
          if (isFolder(path)) {
            if (this.corruptServerItemsMap(bodyOrItemsMap)) {
              log('[Sync] WARNING: Discarding corrupt folder description from server for ' + path);
              return false;
            } else {
              return this.markChildren(path, bodyOrItemsMap, dataFromFetch.toBeSaved, dataFromFetch.missingChildren)
                .then(() => { return true; });
            }
          } else {
            return this.rs.local.setNodes(this.flush(dataFromFetch.toBeSaved))
              .then(() => { return true; });
          }
        });
    } else {
      return this.updateCommonTimestamp(path, revision)
        .then(() => { return true; });
    }
  }

  public handleResponse (path: string, action, r): Promise<boolean> {
    const status = this.interpretStatus(r.statusCode);

    if (status.successful) {
      if (action === 'get') {
        return this.handleGetResponse(path, status, r.body, r.contentType, r.revision);
      } else if (action === 'put' || action === 'delete') {
        return this.completePush(path, action, status.conflict, r.revision).then(function () {
          return true;
        });
      } else {
        throw new Error(`cannot handle response for unknown action ${action}`);
      }
    } else {
      // Unsuccessful
      let error: Error;
      if (status.unAuth) {
        error = new UnauthorizedError();
      } else if (status.networkProblems) {
        error = new SyncError('Network request failed.');
      } else {
        error = new Error('HTTP response code ' + status.statusCode + ' received.');
      }

      return this.dealWithFailure(path).then(() => {
        this.rs._emit('error', error);
        throw error;
      });
    }
  }

  public finishTask (task: SyncTask, queueTask = true): void | Promise<void> {
    if (task.action === undefined) {
      delete this._running[task.path];
      return;
    }

    if (queueTask) {
      log("[Sync] queue finished task:", task.path);
      this._finishedTasks.push(task);
      if (this._finishedTasks.length > 1) {
        log("[Sync] delaying finished task:", task.path);
        return;
      }
    }

    log("[Sync] run task:", task.path);

    return task.promise
      .then(res => {
        return this.handleResponse(task.path, task.action, res);
      }, err => {
        log('[Sync] wireclient rejects its promise!', task.path, task.action, err);
        return this.handleResponse(task.path, task.action, { statusCode: 'offline' });
      })
      .then(async (completed) => {
        this._finishedTasks.shift();
        delete this._timeStarted[task.path];
        delete this._running[task.path];

        if (completed) {
          if (this._tasks[task.path]) {
            for (let i=0; i < this._tasks[task.path].length; i++) {
              this._tasks[task.path][i]();
            }
            delete this._tasks[task.path];
          }
        }

        this.rs._emit('sync-req-done', {
          tasksRemaining: Object.keys(this._tasks).length
        });

        if (this._finishedTasks.length > 0) {
          this.finishTask(this._finishedTasks[0], false);
          return;
        }

        await this.collectTasks(false).then(() => {
          // See if there are any more tasks that are not refresh tasks
          if (!this.hasTasks() || this.stopped) {
            log('[Sync] Sync is done! Reschedule?', Object.getOwnPropertyNames(this._tasks).length, this.stopped);
            if (!this.done) {
              this.done = true;
              this.rs._emit('sync-done', { completed: true });
            }
          } else {
            // Use a 10ms timeout to let the JavaScript runtime catch its breath
            // (and hopefully force an IndexedDB auto-commit?), and also to cause
            // the threads to get staggered and get a good spread over time:
            setTimeout(() => { this.doTasks(); }, 10);
          }
        });
      }, err => {
        log('[Sync] Error', err);

        this._finishedTasks.shift();
        delete this._timeStarted[task.path];
        delete this._running[task.path];

        this.rs._emit('sync-req-done', {
          tasksRemaining: Object.keys(this._tasks).length
        });

        if (this._finishedTasks.length > 0) {
          this.finishTask(this._finishedTasks[0], false);
          return;
        }

        if (!this.done) {
          this.done = true;
          this.rs._emit('sync-done', { completed: false });
        }
      });
  }

  public doTasks (): boolean {
    let numToHave: number, numAdded = 0, path: string;
    if (this.rs.remote.connected) {
      if (this.rs.remote.online) {
        numToHave = this.numThreads;
      } else {
        numToHave = 1;
      }
    } else {
      numToHave = 0;
    }
    const numToAdd = numToHave - Object.getOwnPropertyNames(this._running).length;
    if (numToAdd <= 0) {
      return true;
    }
    for (path in this._tasks) {
      if (!this._running[path]) {
        this._timeStarted[path] = this.now();
        this._running[path] = this.doTask(path);
        this._running[path].then(this.finishTask.bind(this));
        numAdded++;
        if (numAdded >= numToAdd) {
          return true;
        }
      }
    }
    return (numAdded >= numToAdd);
  }

  public async collectTasks (alsoCheckRefresh?: boolean): Promise<void> {
    if (this.hasTasks() || this.stopped) {
      return Promise.resolve();
    }

    return this.collectDiffTasks().then(numDiffs => {
      if (numDiffs || alsoCheckRefresh === false) {
        return Promise.resolve();
      } else {
        return this.collectRefreshTasks();
      }
    }, function (err) { throw err; });
  }

  public addTask (path: string, cb?): void {
    if (!this._tasks[path]) {
      this._tasks[path] = [];
    }
    if (typeof(cb) === 'function') {
      this._tasks[path].push(cb);
    }
  }

  /**
   * Method: sync
   **/
  public sync (): Promise<void> {
    this.done = false;

    if (!this.doTasks()) {
      return this.collectTasks().then(() => {
        try {
          this.doTasks();
        } catch(e) {
          log('[Sync] doTasks error', e);
        }
      }, function (e) {
        log('[Sync] Sync error', e);
        throw new Error('Local cache unavailable');
      });
    } else {
      return Promise.resolve();
    }
  }

  static _rs_init (remoteStorage): void {
    syncCycleCb = function (): void {
      // if (!config.cache) return false
      log('[Sync] syncCycleCb calling syncCycle');

      const env = new Env();
      if (env.isBrowser()) { handleVisibility(env, remoteStorage); }

      if (!remoteStorage.sync) {
        // Call this now that all other modules are also ready:
        remoteStorage.sync = new Sync(remoteStorage);

        if (remoteStorage.syncStopped) {
          log('[Sync] Instantiating sync stopped');
          remoteStorage.sync.stopped = true;
          delete remoteStorage.syncStopped;
        }
      }

      log('[Sync] syncCycleCb calling syncCycle');
      remoteStorage.syncCycle();
    };

    syncOnConnect = function (): void {
      remoteStorage.removeEventListener('connected', syncOnConnect);
      remoteStorage.startSync();
    };

    remoteStorage.on('ready', syncCycleCb);
    remoteStorage.on('connected', syncOnConnect);
  }

  static _rs_cleanup (remoteStorage): void {
    remoteStorage.stopSync();
    remoteStorage.removeEventListener('ready', syncCycleCb);
    remoteStorage.removeEventListener('connected', syncOnConnect);

    remoteStorage.sync = undefined;
    delete remoteStorage.sync;
  }
}

interface Sync extends EventHandling {}
applyMixins(Sync, [EventHandling]);

export = Sync;