emmercm/igir

View on GitHub
src/types/cache.ts

Summary

Maintainability
A
1 hr
Test Coverage
import fs from 'node:fs';
import path from 'node:path';
import * as v8 from 'node:v8';
import * as zlib from 'node:zlib';

import { E_CANCELED, Mutex } from 'async-mutex';

import KeyedMutex from '../keyedMutex.js';
import FsPoly from '../polyfill/fsPoly.js';
import Timer from '../timer.js';

export interface CacheProps {
  filePath?: string;
  fileFlushMillis?: number;
  saveOnExit?: boolean;
}

/**
 * A cache of a fixed size that ejects the oldest inserted key.
 */
export default class Cache<V> {
  private static readonly BUFFER_ENCODING: BufferEncoding = 'binary';

  private keyValues = new Map<string, V>();

  private readonly keyedMutex = new KeyedMutex(1000);

  private hasChanged: boolean = false;

  private saveToFileTimeout?: Timer;

  readonly filePath?: string;

  readonly fileFlushMillis?: number;

  private readonly saveMutex = new Mutex();

  constructor(props?: CacheProps) {
    this.filePath = props?.filePath;
    this.fileFlushMillis = props?.fileFlushMillis;
    if (props?.saveOnExit) {
      // WARN: Jest won't call this: https://github.com/jestjs/jest/issues/10927
      process.once('beforeExit', async () => {
        await this.save();
      });
    }
  }

  /**
   * Return if a key exists in the cache, waiting for any existing operations to complete first.
   */
  public async has(key: string): Promise<boolean> {
    return this.keyedMutex.runExclusiveForKey(key, () => this.keyValues.has(key));
  }

  /**
   * Return all the keys that exist in the cache.
   */
  public keys(): Set<string> {
    return new Set(this.keyValues.keys());
  }

  /**
   * Return the count of keys in the cache.
   */
  public size(): number {
    return this.keyValues.size;
  }

  /**
   * Get the value of a key in the cache, waiting for any existing operations to complete first.
   */
  public async get(key: string): Promise<V | undefined> {
    return this.keyedMutex.runExclusiveForKey(key, () => this.keyValues.get(key));
  }

  /**
   * Get the value of a key in the cache if it exists, or compute a value and set it in the cache
   * otherwise.
   */
  public async getOrCompute(
    key: string,
    runnable: (key: string) => V | Promise<V>,
    shouldRecompute?: (value: V) => boolean | Promise<boolean>,
  ): Promise<V> {
    return this.keyedMutex.runExclusiveForKey(key, async () => {
      if (this.keyValues.has(key)) {
        const existingValue = this.keyValues.get(key) as V;
        if (shouldRecompute === undefined || !(await shouldRecompute(existingValue))) {
          return existingValue;
        }
      }

      const val = await runnable(key);
      this.setUnsafe(key, val);
      return val;
    });
  }

  /**
   * Set the value of a key in the cache.
   */
  public async set(key: string, val: V): Promise<void> {
    return this.keyedMutex.runExclusiveForKey(key, () => this.setUnsafe(key, val));
  }

  private setUnsafe(key: string, val: V): void {
    const oldVal = this.keyValues.get(key);
    this.keyValues.set(key, val);
    if (val !== oldVal) {
      this.saveWithTimeout();
    }
  }

  /**
   * Delete a key in the cache.
   */
  public async delete(key: string | RegExp): Promise<void> {
    let keysToDelete: string[];
    if (key instanceof RegExp) {
      keysToDelete = [...this.keys().keys()].filter((k) => k.match(key));
    } else {
      keysToDelete = [key];
    }

    // Note: avoiding lockKey() because it could get expensive with many keys to delete
    await this.keyedMutex.runExclusiveGlobally(() => {
      keysToDelete.forEach((k) => this.deleteUnsafe(k));
    });
  }

  private deleteUnsafe(key: string): void {
    this.keyValues.delete(key);
    this.saveWithTimeout();
  }

  /**
   * Load the cache from a file.
   */
  public async load(): Promise<Cache<V>> {
    if (this.filePath === undefined || !(await FsPoly.exists(this.filePath))) {
      // Cache doesn't exist, so there is nothing to load
      return this;
    }

    try {
      const compressed = await fs.promises.readFile(this.filePath);
      if (compressed.length === 0) {
        return this;
      }
      // NOTE(cemmer): util.promisify(zlib.inflate) seems to have issues not throwing correctly
      const decompressed = zlib.inflateSync(compressed);
      const keyValuesObject = v8.deserialize(decompressed);
      const keyValuesEntries = Object.entries(keyValuesObject) as [string, V][];
      this.keyValues = new Map(keyValuesEntries);
    } catch {
      /* ignored */
    }

    return this;
  }

  private saveWithTimeout(): void {
    this.hasChanged = true;
    if (
      this.filePath === undefined ||
      this.fileFlushMillis === undefined ||
      this.saveToFileTimeout !== undefined
    ) {
      return;
    }

    this.saveToFileTimeout = Timer.setTimeout(async () => this.save(), this.fileFlushMillis);
  }

  /**
   * Save the cache to a file.
   */
  public async save(): Promise<void> {
    try {
      await this.saveMutex.runExclusive(async () => {
        // Clear any existing timeout
        if (this.saveToFileTimeout !== undefined) {
          this.saveToFileTimeout.cancel();
          this.saveToFileTimeout = undefined;
        }

        if (this.filePath === undefined || !this.hasChanged) {
          return;
        }

        const keyValuesObject = Object.fromEntries(this.keyValues);
        const decompressed = v8.serialize(keyValuesObject);
        // NOTE(cemmer): util.promisify(zlib.deflate) seems to have issues not throwing correctly
        const compressed = zlib.deflateSync(decompressed);

        // Ensure the directory exists
        const dirPath = path.dirname(this.filePath);
        if (!(await FsPoly.exists(dirPath))) {
          await FsPoly.mkdir(dirPath, { recursive: true });
        }

        // Write to a temp file first
        const tempFile = await FsPoly.mktemp(this.filePath);
        await FsPoly.writeFile(tempFile, compressed, { encoding: Cache.BUFFER_ENCODING });

        // Validate the file was written correctly
        const tempFileCache = await new Cache({ filePath: tempFile }).load();
        if (tempFileCache.size() !== Object.keys(keyValuesObject).length) {
          // The written file is bad, don't use it
          await FsPoly.rm(tempFile, { force: true });
          return;
        }

        // Overwrite the real file with the temp file
        try {
          await FsPoly.mv(tempFile, this.filePath);
        } catch {
          return;
        }
        this.hasChanged = false;
        this.saveMutex.cancel(); // cancel all waiting locks, we just saved
      });
    } catch (error) {
      if (error !== E_CANCELED) {
        throw error;
      }
    }
  }
}