forattini-dev/s3db.js

View on GitHub
src/s3-client.class.ts

Summary

Maintainability
F
3 days
Test Coverage
import * as path from "path";
import { chunk } from "lodash";
import { nanoid } from "nanoid";
import { Stream } from "stream";
import EventEmitter from "events";
import { S3, Credentials } from "aws-sdk";
import PromisePool from "@supercharge/promise-pool";

import { ClientNoSuchKey } from "./errors";

export class S3Client extends EventEmitter {
  id: string;
  client: S3;
  bucket: string;
  keyPrefix: string;
  parallelism: number;

  constructor({
    connectionString,
    parallelism = 10,
    AwsS3,
  }: {
    connectionString: string;
    parallelism?: number;
    AwsS3?: S3;
  }) {
    super();
    this.id = nanoid(7);

    const uri = new URL(connectionString);
    const params = uri.searchParams;

    this.bucket = uri.hostname;
    this.parallelism = params.has("parallelism")
      ? parseInt(params.get("parallelism") as string)
      : parallelism;

    if (["/", "", null].includes(uri.pathname)) {
      this.keyPrefix = "";
    } else {
      let [, ...subpath] = uri.pathname.split("/");
      this.keyPrefix = [...(subpath || [])].join("/");
    }

    this.client =
      AwsS3 ||
      new S3({
        credentials: new Credentials({
          accessKeyId: uri.username,
          secretAccessKey: uri.password,
        }),
      });
  }

  /**
   *
   * @param param0
   * @returns
   */
  async getObject(key: string) {
    try {
      const options = {
        Bucket: this.bucket,
        Key: path.join(this.keyPrefix, key),
      };

      this.emit("request", "getObject", options);
      const response = await this.client.getObject(options).promise();

      this.emit("response", "getObject", options, response);
      this.emit("getObject", options, response);

      return response;
    } catch (error: unknown) {
      if (error instanceof Error) {
        if (error.name === "NoSuchKey") {
          return Promise.reject(
            new ClientNoSuchKey({ bucket: this.bucket, key })
          );
        }
      }

      return Promise.reject(error);
    }
  }

  /**
   *
   * @param param0
   * @returns
   */
  async putObject({
    key,
    metadata,
    contentType,
    body,
    contentEncoding,
  }: {
    key: string;
    metadata?: object;
    contentType?: string;
    body?: string | Stream | Uint8Array;
    contentEncoding?: string | null | undefined;
  }) {
    try {
      const options: any = {
        Bucket: this.bucket,
        Key: this.keyPrefix ? path.join(this.keyPrefix, key) : key,
        Metadata: { ...metadata },
        Body: body || "",
        ContentType: contentType,
        ContentEncoding: contentEncoding,
      };

      this.emit("request", "putObject", options);
      const response = await this.client.putObject(options).promise();

      this.emit("response", "putObject", options, response);
      this.emit("putObject", options, response);

      return response;
    } catch (error) {
      this.emit("error", error);
      return Promise.reject(error);
    }
  }

  /**
   * Proxy to AWS S3's headObject
   * @param {Object} param
   * @param {string} param.key
   * @returns
   */
  async headObject(key: string) {
    try {
      const options: any = {
        Bucket: this.bucket,
        Key: this.keyPrefix ? path.join(this.keyPrefix, key) : key,
      };

      this.emit("request", "headObject", options);
      const response = await this.client.headObject(options).promise();

      this.emit("response", "headObject", options, response);
      this.emit("headObject", options, response);

      return response;
    } catch (error: unknown) {
      if (error instanceof Error) {
        if (error.name === "NoSuchKey" || error.name === "NotFound") {
          return Promise.reject(
            new ClientNoSuchKey({ bucket: this.bucket, key })
          );
        }
      }

      this.emit("error", error);
      return Promise.reject(error);
    }
  }

  /**
   * Proxy to AWS S3's deleteObject
   * @param {Object} param
   * @param {string} param.key
   * @returns
   */
  async deleteObject(key: string) {
    try {
      const options: any = {
        Bucket: this.bucket,
        Key: this.keyPrefix ? path.join(this.keyPrefix, key) : key,
      };

      this.emit("request", "deleteObject", options);
      const response = await this.client.deleteObject(options).promise();

      this.emit("response", "deleteObject", options, response);
      this.emit("deleteObject", options, response);

      return response;
    } catch (error: unknown) {
      this.emit("error", error);

      if (error instanceof Error) {
        if (error.name === "NoSuchKey") {
          return Promise.reject(
            new ClientNoSuchKey({ bucket: this.bucket, key })
          );
        }
      }

      return Promise.reject(error);
    }
  }

  /**
   * Proxy to AWS S3's deleteObjects
   * @param {Object} param
   * @param {string} param.keys
   * @returns
   */
  async deleteObjects(keys: string[]) {
    const packages = chunk(keys, 1000);

    const { results, errors } = await PromisePool.for(packages)
      .withConcurrency(this.parallelism)
      .process(async (keys: string[]) => {
        try {
          const options = {
            Bucket: this.bucket,
            Delete: {
              Objects: keys.map((key) => ({
                Key: this.keyPrefix ? path.join(this.keyPrefix, key) : key,
              })),
            },
          };

          this.emit("request", "deleteObjects", options);
          const response = await this.client.deleteObjects(options).promise();

          this.emit("response", "deleteObjects", options, response);
          this.emit("deleteObjects", options, response);

          return response;
        } catch (error: unknown) {
          this.emit("error", error);
          return Promise.reject(error);
        }
      });

    return {
      deleted: results,
      notFound: errors,
    };
  }

  /**
   *
   * @param param0
   * @returns
   */
  async listObjects({
    prefix,
    maxKeys = 1000,
    continuationToken,
  }: {
    prefix?: string;
    maxKeys?: number;
    continuationToken?: any;
  } = {}): Promise<S3.ListObjectsV2Output> {
    try {
      const options = {
        Bucket: this.bucket,
        MaxKeys: maxKeys,
        ContinuationToken: continuationToken,
        Prefix: this.keyPrefix
          ? path.join(this.keyPrefix, prefix || "")
          : prefix || "",
      };

      this.emit("request", "listObjectsV2", options);
      const response = await this.client.listObjectsV2(options).promise();

      this.emit("response", "listObjectsV2", options, response);
      this.emit("listObjectsV2", options, response);

      return response;
    } catch (error: unknown) {
      this.emit("error", error);
      return Promise.reject(error);
    }
  }

  async count({ prefix }: { prefix?: string } = {}) {
    this.emit("request", "count", { prefix });

    let count = 0;
    let truncated = true;
    let continuationToken;

    while (truncated) {
      const options = {
        prefix,
        continuationToken,
      };

      const res: S3.ListObjectsV2Output = await this.listObjects(options);

      count += res.KeyCount || 0;
      truncated = res.IsTruncated || false;
      continuationToken = res.NextContinuationToken;
    }

    this.emit("response", "count", { prefix }, count);
    this.emit("count", { prefix }, count);

    return count;
  }

  async getAllKeys({ prefix }: { prefix?: string } = {}) {
    this.emit("request", "getAllKeys", { prefix });

    let keys: any[] = [];
    let truncated = true;
    let continuationToken;

    while (truncated) {
      const options: any = {
        prefix,
        continuationToken,
      };

      const res = await this.listObjects(options);

      if (res.Contents) {
        keys = keys.concat(res.Contents.map((x) => x.Key));
      }

      truncated = res.IsTruncated || false;
      continuationToken = res.NextContinuationToken;
    }

    if (this.keyPrefix) {
      keys = keys
        .map((x) => x.replace(this.keyPrefix, ""))
        .map((x) => (x.startsWith("/") ? x.replace(`/`, "") : x));
    }

    this.emit("response", "getAllKeys", { prefix }, keys);
    this.emit("getAllKeys", { prefix }, keys);

    return keys;
  }

  async getContinuationTokenAfterOffset({
    prefix,
    offset = 1000,
  }: {
    prefix?: string;
    offset: number;
  }) {
    if (offset === 0) return null;

    let truncated = true;
    let continuationToken;
    let skipped = 0;

    while (truncated) {
      let maxKeys =
        offset < 1000
          ? offset
          : offset - skipped > 1000
          ? 1000
          : offset - skipped;

      const options: any = {
        prefix,
        maxKeys,
        continuationToken,
      };

      const res = await this.listObjects(options);

      if (res.Contents) {
        skipped += res.Contents.length;
      }

      truncated = res.IsTruncated || false;
      continuationToken = res.NextContinuationToken;

      if (skipped >= offset) {
        break;
      }
    }

    return continuationToken;
  }

  async getKeysPage({
    prefix,
    offset = 0,
    amount = 100,
  }: {
    prefix?: string;
    offset?: number;
    amount?: number;
  } = {}) {
    let keys: any[] = [];
    let truncated = true;
    let continuationToken;

    if (offset > 0) {
      continuationToken = await this.getContinuationTokenAfterOffset({
        prefix,
        offset,
      });
    }

    while (truncated) {
      const options: any = {
        prefix,
        continuationToken,
      };

      const res = await this.listObjects(options);

      if (res.Contents) {
        keys = keys.concat(res.Contents.map((x) => x.Key));
      }

      truncated = res.IsTruncated || false;
      continuationToken = res.NextContinuationToken;

      if (keys.length > amount) {
        keys = keys.splice(0, amount);
        break;
      }
    }

    if (this.keyPrefix) {
      keys = keys
        .map((x) => x.replace(this.keyPrefix, ""))
        .map((x) => (x.startsWith("/") ? x.replace(`/`, "") : x));
    }

    return keys;
  }
}

export default S3Client;