Aam-Digital/ndb-core

View on GitHub
src/app/core/database/pouch-database.ts

Summary

Maintainability
C
7 hrs
Test Coverage
B
85%
/*
 *     This file is part of ndb-core.
 *
 *     ndb-core is free software: you can redistribute it and/or modify
 *     it under the terms of the GNU General Public License as published by
 *     the Free Software Foundation, either version 3 of the License, or
 *     (at your option) any later version.
 *
 *     ndb-core is distributed in the hope that it will be useful,
 *     but WITHOUT ANY WARRANTY; without even the implied warranty of
 *     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *     GNU General Public License for more details.
 *
 *     You should have received a copy of the GNU General Public License
 *     along with ndb-core.  If not, see <http://www.gnu.org/licenses/>.
 */

import { Database, GetAllOptions, GetOptions, QueryOptions } from "./database";
import { Logging } from "../logging/logging.service";
import PouchDB from "pouchdb-browser";
import memory from "pouchdb-adapter-memory";
import { PerformanceAnalysisLogging } from "../../utils/performance-analysis-logging";
import { Injectable, Optional } from "@angular/core";
import { firstValueFrom, Observable, Subject } from "rxjs";
import { filter } from "rxjs/operators";
import { HttpStatusCode } from "@angular/common/http";
import { environment } from "../../../environments/environment";
import { KeycloakAuthService } from "../session/auth/keycloak/keycloak-auth.service";

/**
 * Wrapper for a PouchDB instance to decouple the code from
 * that external library.
 *
 * Additional convenience functions on top of the PouchDB API
 * should be implemented in the abstract {@link Database}.
 */
@Injectable()
export class PouchDatabase extends Database {
  /**
   * Small helper function which creates a database with in-memory PouchDB initialized
   */
  static create(): PouchDatabase {
    return new PouchDatabase().initInMemoryDB();
  }

  /**
   * The reference to the PouchDB instance
   * @private
   */
  private pouchDB: PouchDB.Database;

  /**
   * A list of promises that resolve once all the (until now saved) indexes are created
   * @private
   */
  private indexPromises: Promise<any>[] = [];

  /**
   * An observable that emits a value whenever the PouchDB receives a new change.
   * This change can come from the current user or remotely from the (live) synchronization
   * @private
   */
  private changesFeed: Subject<any>;

  private databaseInitialized = new Subject<void>();

  /**
   * Create a PouchDB database manager.
   */
  constructor(@Optional() private authService?: KeycloakAuthService) {
    super();
  }

  /**
   * Initialize the PouchDB with the in-memory adapter.
   * See {@link https://github.com/pouchdb/pouchdb/tree/master/packages/node_modules/pouchdb-adapter-memory}
   * @param dbName the name for the database
   */
  initInMemoryDB(dbName = "in-memory-database"): PouchDatabase {
    PouchDB.plugin(memory);
    this.pouchDB = new PouchDB(dbName, { adapter: "memory" });
    this.databaseInitialized.complete();
    return this;
  }

  /**
   * Initialize the PouchDB with the IndexedDB/in-browser adapter (default).
   * See {link https://github.com/pouchdb/pouchdb/tree/master/packages/node_modules/pouchdb-browser}
   * @param dbName the name for the database under which the IndexedDB entries will be created
   * @param options PouchDB options which are directly passed to the constructor
   */
  initIndexedDB(
    dbName = "indexed-database",
    options?: PouchDB.Configuration.DatabaseConfiguration,
  ): PouchDatabase {
    this.pouchDB = new PouchDB(dbName, options);
    this.databaseInitialized.complete();
    return this;
  }

  /**
   * Initializes the PouchDB with the http adapter to directly access a remote CouchDB without replication
   * See {@link https://pouchdb.com/adapters.html#pouchdb_over_http}
   * @param dbName (relative) path to the remote database
   * @param fetch a overwrite for the default fetch handler
   */
  initRemoteDB(
    dbName = `${environment.DB_PROXY_PREFIX}/${environment.DB_NAME}`,
  ): PouchDatabase {
    const options = {
      adapter: "http",
      skip_setup: true,
      fetch: (url: string | Request, opts: RequestInit) =>
        this.defaultFetch(url, opts),
    };
    this.pouchDB = new PouchDB(dbName, options);
    this.databaseInitialized.complete();
    return this;
  }

  private defaultFetch: Fetch = async (url: string | Request, opts: any) => {
    if (typeof url !== "string") {
      const err = new Error("PouchDatabase.fetch: url is not a string");
      err["details"] = url;
      throw err;
    }

    const remoteUrl =
      environment.DB_PROXY_PREFIX + url.split(environment.DB_PROXY_PREFIX)[1];
    this.authService.addAuthHeader(opts.headers);

    let result: Response;
    try {
      result = await PouchDB.fetch(remoteUrl, opts);
    } catch (err) {
      Logging.debug("navigator.onLine", navigator.onLine);
      Logging.warn("Failed to fetch from DB", err);
    }

    // retry login if request failed with unauthorized
    if (!result || result.status === HttpStatusCode.Unauthorized) {
      try {
        await this.authService.login();
        this.authService.addAuthHeader(opts.headers);
        result = await PouchDB.fetch(remoteUrl, opts);
      } catch (err) {
        Logging.debug("navigator.onLine", navigator.onLine);
        Logging.warn("Failed to fetch from DB", err);
      }
    }

    if (!result || result.status >= 500) {
      Logging.debug("Actual DB Fetch response", result);
      Logging.debug("navigator.onLine", navigator.onLine);
      throw new DatabaseException({
        error: "Failed to fetch from DB",
        actualResponse: JSON.stringify(result),
        actualResponseBody: await result?.text(),
      });
    }
    return result;
  };

  async getPouchDBOnceReady(): Promise<PouchDB.Database> {
    await firstValueFrom(this.databaseInitialized, {
      defaultValue: this.pouchDB,
    });
    return this.pouchDB;
  }

  /**
   * Get the actual instance of the PouchDB
   */
  getPouchDB(): PouchDB.Database {
    return this.pouchDB;
  }

  /**
   * Load a single document by id from the database.
   * (see {@link Database})
   * @param id The primary key of the document to be loaded
   * @param options Optional PouchDB options for the request
   * @param returnUndefined (Optional) return undefined instead of throwing error if doc is not found in database
   */
  async get(
    id: string,
    options: GetOptions = {},
    returnUndefined?: boolean,
  ): Promise<any> {
    try {
      return await (await this.getPouchDBOnceReady()).get(id, options);
    } catch (err) {
      if (err.status === 404) {
        Logging.debug("Doc not found in database: " + id);
        if (returnUndefined) {
          return undefined;
        }
      }

      throw new DatabaseException(err, id);
    }
  }

  /**
   * Load all documents (matching the given PouchDB options) from the database.
   * (see {@link Database})
   *
   * Normally you should rather use "getAll()" or another well typed method of this class
   * instead of passing PouchDB specific options here
   * because that will make your code tightly coupled with PouchDB rather than any other database provider.
   *
   * @param options PouchDB options object as in the normal PouchDB library
   */
  async allDocs(options?: GetAllOptions) {
    try {
      const result = await (await this.getPouchDBOnceReady()).allDocs(options);
      return result.rows.map((row) => row.doc);
    } catch (err) {
      throw new DatabaseException(
        err,
        "allDocs; startkey: " + options?.["startkey"],
      );
    }
  }

  /**
   * Save a document to the database.
   * (see {@link Database})
   *
   * @param object The document to be saved
   * @param forceOverwrite (Optional) Whether conflicts should be ignored and an existing conflicting document forcefully overwritten.
   */
  async put(object: any, forceOverwrite = false): Promise<any> {
    if (forceOverwrite) {
      object._rev = undefined;
    }

    try {
      return await (await this.getPouchDBOnceReady()).put(object);
    } catch (err) {
      if (err.status === 409) {
        return this.resolveConflict(object, forceOverwrite, err);
      } else {
        throw new DatabaseException(err, object._id);
      }
    }
  }

  /**
   * Save an array of documents to the database
   * @param objects the documents to be saved
   * @param forceOverwrite whether conflicting versions should be overwritten
   * @returns array with the result for each object to be saved, if any item fails to be saved, this returns a rejected Promise.
   *          The save can partially fail and return a mix of success and error states in the array (e.g. `[{ ok: true, ... }, { error: true, ... }]`)
   */
  async putAll(objects: any[], forceOverwrite = false): Promise<any> {
    if (forceOverwrite) {
      objects.forEach((obj) => (obj._rev = undefined));
    }

    const pouchDB = await this.getPouchDBOnceReady();
    const results = await pouchDB.bulkDocs(objects);

    for (let i = 0; i < results.length; i++) {
      // Check if document update conflicts happened in the request
      const result = results[i] as PouchDB.Core.Error;
      if (result.status === 409) {
        results[i] = await this.resolveConflict(
          objects.find((obj) => obj._id === result.id),
          forceOverwrite,
          result,
        ).catch((e) => {
          Logging.warn(
            "error during putAll",
            e,
            objects.map((x) => x._id),
          );
          return new DatabaseException(e);
        });
      }
    }

    if (results.some((r) => r instanceof Error)) {
      return Promise.reject(results);
    }
    return results;
  }

  /**
   * Delete a document from the database
   * (see {@link Database})
   *
   * @param object The document to be deleted (usually this object must at least contain the _id and _rev)
   */
  remove(object: any) {
    return this.getPouchDBOnceReady()
      .then((pouchDB) => pouchDB.remove(object))
      .catch((err) => {
        throw new DatabaseException(err, object["_id"]);
      });
  }

  /**
   * Check if a database is new/empty.
   * Returns true if there are no documents in the database
   */
  isEmpty(): Promise<boolean> {
    return this.getPouchDBOnceReady()
      .then((pouchDB) => pouchDB.info())
      .then((res) => res.doc_count === 0);
  }

  /**
   * Listen to changes to documents which have an _id with the given prefix
   * @param prefix for which document changes are emitted
   * @returns observable which emits the filtered changes
   */
  changes(prefix: string): Observable<any> {
    if (!this.changesFeed) {
      this.changesFeed = new Subject();
      this.subscribeChanges();
    }
    return this.changesFeed.pipe(filter((doc) => doc._id.startsWith(prefix)));
  }

  private async subscribeChanges() {
    (await this.getPouchDBOnceReady())
      .changes({
        live: true,
        since: "now",
        include_docs: true,
      })
      .addListener("change", (change) => this.changesFeed.next(change.doc))
      .catch((err) => {
        if (
          err.statusCode === HttpStatusCode.Unauthorized ||
          err.statusCode === HttpStatusCode.GatewayTimeout
        ) {
          Logging.warn(err);
        } else {
          Logging.error(err);
        }

        // retry
        setTimeout(() => this.subscribeChanges(), 10000);
      });
  }

  /**
   * Destroy the database and all saved data
   */
  async destroy(): Promise<any> {
    await Promise.all(this.indexPromises);
    if (this.pouchDB) {
      return this.pouchDB.destroy();
    }
  }

  /**
   * Reset the database state so a new one can be opened.
   */
  reset() {
    this.pouchDB = undefined;
    this.changesFeed = undefined;
    this.databaseInitialized = new Subject();
  }

  /**
   * Query data from the database based on a more complex, indexed request.
   * (see {@link Database})
   *
   * This is directly calling the PouchDB implementation of this function.
   * Also see the documentation there: {@link https://pouchdb.com/api.html#query_database}
   *
   * @param fun The name of a previously saved database index
   * @param options Additional options for the query, like a `key`. See the PouchDB docs for details.
   */
  query(
    fun: string | ((doc: any, emit: any) => void),
    options: QueryOptions,
  ): Promise<any> {
    return this.getPouchDBOnceReady()
      .then((pouchDB) => pouchDB.query(fun, options))
      .catch((err) => {
        throw new DatabaseException(
          err,
          typeof fun === "string" ? fun : undefined,
        );
      });
  }

  /**
   * Create a database index to `query()` certain data more efficiently in the future.
   * (see {@link Database})
   *
   * Also see the PouchDB documentation regarding indices and queries: {@link https://pouchdb.com/api.html#query_database}
   *
   * @param designDoc The PouchDB style design document for the map/reduce query
   */
  saveDatabaseIndex(designDoc: any): Promise<void> {
    const creationPromise = this.createOrUpdateDesignDoc(designDoc);
    this.indexPromises.push(creationPromise);
    return creationPromise;
  }

  private async createOrUpdateDesignDoc(designDoc): Promise<void> {
    const existingDesignDoc = await this.get(designDoc._id, {}, true);
    if (!existingDesignDoc) {
      Logging.debug("creating new database index");
    } else if (
      JSON.stringify(existingDesignDoc.views) !==
      JSON.stringify(designDoc.views)
    ) {
      Logging.debug("replacing existing database index");
      designDoc._rev = existingDesignDoc._rev;
    } else {
      // already up to date, nothing more to do
      return;
    }

    await this.put(designDoc, true);
    await this.prebuildViewsOfDesignDoc(designDoc);
  }

  @PerformanceAnalysisLogging
  private async prebuildViewsOfDesignDoc(designDoc: any): Promise<void> {
    for (const viewName of Object.keys(designDoc.views)) {
      const queryName = designDoc._id.replace(/_design\//, "") + "/" + viewName;
      await this.query(queryName, { key: "1" });
    }
  }

  /**
   * Attempt to intelligently resolve conflicting document versions automatically.
   * @param newObject
   * @param overwriteChanges
   * @param existingError
   */
  private async resolveConflict(
    newObject: any,
    overwriteChanges = false,
    existingError: any = {},
  ): Promise<any> {
    const existingObject = await this.get(newObject._id);
    const resolvedObject = this.mergeObjects(existingObject, newObject);
    if (resolvedObject) {
      Logging.debug(
        "resolved document conflict automatically (" + resolvedObject._id + ")",
      );
      return this.put(resolvedObject);
    } else if (overwriteChanges) {
      Logging.debug(
        "overwriting conflicting document version (" + newObject._id + ")",
      );
      newObject._rev = existingObject._rev;
      return this.put(newObject);
    } else {
      existingError.message = `${
        existingError.message
      } (unable to resolve) ID: ${JSON.stringify(newObject)}`;
      throw new DatabaseException(existingError);
    }
  }

  private mergeObjects(_existingObject: any, _newObject: any) {
    // TODO: implement automatic merging of conflicting entity versions
    return undefined;
  }
}

/**
 * This overwrites PouchDB's error class which only logs limited information
 */
class DatabaseException extends Error {
  constructor(
    error: PouchDB.Core.Error | { [key: string]: any },
    entityId?: string,
  ) {
    super();

    if (entityId) {
      error["entityId"] = entityId;
    }
    Object.assign(this, error);
  }
}