bcgov/common-object-management-service

View on GitHub
app/src/services/sync.js

Summary

Maintainability
F
1 wk
Test Coverage
A
95%
const { NIL: SYSTEM_USER, v4: uuidv4, validate: uuidValidate } = require('uuid');

const log = require('../components/log')(module.filename);
const utils = require('../db/models/utils');

const { ObjectModel, Version } = require('../db/models');
const { getKeyValue, getUniqueObjects, toLowerKeys } = require('../components/utils');

const metadataService = require('./metadata');
const objectService = require('./object');
const storageService = require('./storage');
const tagService = require('./tag');
const versionService = require('./version');

/**
 * The Sync Service
 * Synchronizes data between S3 object storage and the COMS database
 */
const service = {
  /**
   * @function _deriveObjectId
   * Checks an S3 Object for any previous `coms-id` tag traces and returns it if found.
   * Otherwise it writes a new `coms-id` to the S3 Object if none were previously found.
   * @param {object | boolean} s3Object The result of an S3 HeadObject operation
   * @param {string} path String representing the canonical path for the specified object
   * @param {string | null} bucketId uuid of bucket or `null` if syncing object in default bucket
   * @returns {Promise<string>} Resolves to an existing objectId or creates a new one
   */
  _deriveObjectId: async (s3Object, path, bucketId) => {
    let objId = uuidv4();

    if (typeof s3Object === 'object') { // If regular S3 Object
      const TagSet = await storageService.getObjectTagging({ filePath: path, bucketId: bucketId })
        .then(result => result.TagSet ?? []);
      const s3ObjectComsId = TagSet.find(obj => (obj.Key === 'coms-id'))?.Value;

      if (s3ObjectComsId && uuidValidate(s3ObjectComsId)) {
        objId = s3ObjectComsId;
      } else { // Update S3 Object if there is still remaining space in the TagSet
        if (TagSet.length < 10) { // putObjectTagging replaces S3 tags so new TagSet must contain existing values
          await storageService.putObjectTagging({
            filePath: path,
            bucketId: bucketId,
            tags: TagSet.concat([{ Key: 'coms-id', Value: objId }])
          }).catch((err) => {
            log.warn(`Unable to add coms-id tag: ${err.message}`, { function: '_deriveObjectId' });
          });
        }
      }
    } else if (typeof s3Object === 'boolean') { // If soft-deleted S3 Object
      const { Versions } = await storageService.listAllObjectVersions({ filePath: path, bucketId: bucketId });

      for (const versionId of Versions.map(version => version.VersionId)) {
        const TagSet = await storageService.getObjectTagging({
          filePath: path,
          s3VersionId: versionId,
          bucketId: bucketId
        }).then(result => result.TagSet ?? []);
        const oldObjId = TagSet.find(obj => obj.Key === 'coms-id')?.Value;

        if (oldObjId && uuidValidate(oldObjId)) {
          objId = oldObjId;
          break; // Stop iterating if a valid uuid was found
        }
      }
    }

    return Promise.resolve(objId);
  },

  /**
   * @function syncJob
   * Orchestrates the synchronization of all aspects of a specified object
   * Wraps all child processes in one db transaction
   * @param {string} path String representing the canonical path for the specified object
   * @param {string | null} bucketId uuid of bucket or `null` if syncing object in default bucket
   * @param {boolean} [full=false] Optional boolean indicating whether to execute full recursive run
   * @param {string} [userId=SYSTEM_USER] Optional uuid attributing which user added the job
   * @returns {Promise<string | undefined>} Resolves to object uuid or undefined when sync is completed
   * @throws If the synchronization job encountered an error
   */
  syncJob: async (path, bucketId, full = false, userId = SYSTEM_USER) => {
    try {
      if (!path) throw new Error('Path must be defined');

      return await utils.trxWrapper(async (trx) => {
        // 1. Sync Object
        const object = await service.syncObject(path, bucketId, userId, trx)
          .then(obj => obj.object);

        // 2. Sync Object Versions
        let versions = [];
        if (object) {
          versions = await service.syncVersions(object, userId, trx);
        }

        // 3. Sync Version Metadata & Tags
        for (const v of versions) {
          const tasks = [ // Always Update Tags regardless of modification state
            service.syncTags(v.version, path, bucketId, userId, trx)
          ];
          // Only Update Metadata if version has modifications or full mode
          if (v.modified || full) {
            tasks.push(service.syncMetadata(v.version, path, bucketId, userId, trx));
          }

          await Promise.all(tasks);
        }

        return Promise.resolve(object?.id);
      });
    }
    catch (err) {
      log.error(err, { function: 'syncJob' });
      throw err;
    }
  },

  /**
   * @function syncObject
   * Synchronizes Object level data
   * @param {string} path The path of object in sync job
   * @param {string | null} bucketId The uuid bucketId of object in sync job
   * @param {string} [userId=SYSTEM_USER] The uuid of a user that created the sync job
   * @param {object} [etrx=undefined] An optional Objection Transaction object
   * @returns {Promise<object>} An object containing COMS object if applicable, and modified boolean on whether it was
   * modified or not
   */
  syncObject: async (path, bucketId, userId = SYSTEM_USER, etrx = undefined) => {
    let trx;
    try {
      trx = etrx ? etrx : await ObjectModel.startTransaction();

      let modified = false;
      let response;

      // Check for COMS and S3 Object statuses
      const [comsObject, s3Object] = await Promise.allSettled([
        // COMS Object
        objectService.searchObjects({ path: path, bucketId: bucketId }, trx),
        // S3 Object
        storageService.headObject({ filePath: path, bucketId: bucketId })
          .catch((e) => { // return boolean true if object is soft-deleted in S3
            return !!e.$response.headers['x-amz-delete-marker'];
          })
      ]).then(([comsPromise, s3Promise]) => ([comsPromise.value?.data[0], s3Promise.value]));

      if (s3Object) {
        // Determine S3 Public status
        let s3Public;
        try {
          s3Public = await storageService.getObjectPublic({ filePath: path, bucketId: bucketId });
        } catch (e) {
          // Gracefully continue even when S3 ACL management operation fails
          log.warn(`Failed to read ACL permissions from S3: ${e.message}`, {
            function: 'syncObject', filePath: path, bucketId: bucketId
          });
        }

        // Case: already synced - record & update public status as needed
        if (comsObject) {

          response = await objectService.update({
            id: comsObject.id,
            userId: userId,
            path: comsObject.path,
            // update if public in s3
            public: s3Public ? s3Public : comsObject.public,
            lastSyncedDate: new Date().toISOString()
          }, trx);

          // if public flag changed mark as modified
          if (s3Public && !comsObject.public) modified = true;
        }

        // Case: not in COMS - insert new COMS object
        else {
          const objId = await service._deriveObjectId(s3Object, path, bucketId);

          response = await objectService.create({
            id: objId,
            name: path.match(/(?!.*\/)(.*)$/)[0], // get `name` column
            path: path,
            public: s3Public,
            bucketId: bucketId,
            userId: userId,
            lastSyncedDate: new Date().toISOString()
          }, trx);

          modified = true;
        }
      } else {
        // Case: missing in S3 - drop COMS object
        if (comsObject) {
          // Delete COMS Object and cascade all child records from COMS
          await objectService.delete(comsObject.id, trx);

          // TODO: Relatively slow operations - determine if this can be optimized
          // Prune metadata and tag records
          await metadataService.pruneOrphanedMetadata(trx);
          await tagService.pruneOrphanedTags(trx);
        }
      }

      if (!etrx) await trx.commit();
      return Promise.resolve({ modified: modified, object: response });
    } catch (err) {
      if (!etrx && trx) await trx.rollback();
      throw err;
    }
  },

  /**
   * @function syncVersions
   * Synchronizes Version level data
   * @param {object | string} object The parent object or object uuid
   * @param {string} [userId=SYSTEM_USER] The uuid of a user that created the sync job
   * @param {object} [etrx=undefined] An optional Objection Transaction object
   * @returns {Promise<Array<[object | undefined, boolean]>>} An array of tuples with the COMS versions if applicable,
   * and boolean on whether it was modified or not
   */
  syncVersions: async (object, userId = SYSTEM_USER, etrx = undefined) => {
    let trx;
    try {
      trx = etrx ? etrx : await Version.startTransaction();

      // Fetch COMS Object record if necessary
      const comsObject = typeof object === 'object' ? object : await objectService.read(object, trx);

      // Check for COMS and S3 Version statuses
      const [comsVersions, s3VersionsRaw] = await Promise.allSettled([
        versionService.list(comsObject.id, trx),
        storageService.listAllObjectVersions({ filePath: comsObject.path, bucketId: comsObject.bucketId })
      ]).then(settled => settled.map(promise => promise.value));

      // Combine S3 DeleteMarkers and Versions into one array
      const s3Versions = s3VersionsRaw.DeleteMarkers
        .map(dm => ({ DeleteMarker: true, ...dm }))
        .concat(s3VersionsRaw.Versions);

      // delete versions from COMS that are not in S3
      // get list of unique coms versions
      const uniqueCVIds = getUniqueObjects(comsVersions, 's3VersionId').map(v => v.id);

      // get COMS versions that are not in S3 (matching on s3VersionId) OR not
      // in list of unique COMS versions (matching on id)
      const cVsToDelete = comsVersions.filter(cv => {
        const notInS3 = !s3Versions.some(s3v => (s3v.VersionId === String(cv.s3VersionId)));
        const isDuplicate = !uniqueCVIds.includes(cv.id);
        return notInS3 || isDuplicate;
      });

      if (cVsToDelete.length) {
        await Version.query(trx)
          .delete()
          .where('objectId', comsObject.id)
          .whereNotNull('s3VersionId')
          .whereIn('id', cVsToDelete.map(cv => cv.id));
      }
      // delete versions from comsVersions array for further comparisons
      const comsVersionsToKeep = comsVersions.filter(cv => !cVsToDelete.some(v => cv.id === v.id));

      // Add and Update versions in COMS
      const response = await Promise.all(s3Versions.map(async s3Version => {
        // S3 Object is in non-versioned bucket
        if (s3Version.VersionId === 'null') {
          const mimeType = await storageService.headObject({
            filePath: comsObject.path,
            bucketId: comsObject.bucketId
          }).then(obj => obj.ContentType);

          // Get existing version
          const existingVersion = comsVersions[0];

          // No existing version found
          if (!existingVersion) {
            const newVersion = await versionService.create({
              s3VersionId: null,
              mimeType: mimeType,
              id: comsObject.id,
              etag: s3Version.ETag,
              isLatest: true,
              lastModifiedDate: s3Version.LastModified ? new Date(s3Version.LastModified).toISOString() : undefined
            }, userId, trx);
            return { modified: true, version: newVersion };
          }

          // Latest version has different values
          else if (
            existingVersion.mimeType !== mimeType ||
            existingVersion.etag != s3Version.ETag ||
            existingVersion.lastModifiedDate != (s3Version.LastModified ?
              new Date(s3Version.LastModified).toISOString() : undefined)
          ) {
            const updatedVersion = await versionService.update({
              mimeType: mimeType,
              id: comsObject.id,
              etag: s3Version.ETag,
              isLatest: true,
              lastModifiedDate: s3Version.LastModified ? new Date(s3Version.LastModified).toISOString() : undefined
            }, userId, trx);
            return { modified: true, version: updatedVersion };
          }

          // Version record not modified
          else return { version: existingVersion };
        }
        // S3 Object is in versioned bucket (ie: if VersionId is not 'null')
        else {
          const comsVersion = comsVersionsToKeep.find(cv => cv.s3VersionId === s3Version.VersionId);

          if (comsVersion) { // Version is in COMS
            let modified, updated;

            // Recorded version has different values
            if (
              comsVersion.etag != s3Version.ETag ||
              comsVersion.lastModifiedDate != (s3Version.LastModified ?
                new Date(s3Version.LastModified).toISOString() : undefined)
            ) {
              modified = true;
              updated = await versionService.update({
                id: comsObject.id,
                s3VersionId: s3Version.VersionId,
                etag: s3Version.ETag,
                lastModifiedDate: s3Version.LastModified ? new Date(s3Version.LastModified).toISOString() : undefined
              }, userId, trx);
            }

            // Patch isLatest flags if changed
            if (s3Version.IsLatest) {
              modified = true;
              updated = await versionService.updateIsLatest(comsObject.id, trx);
            }

            return { modified: modified, version: updated ?? comsVersion };
          } else { // Version is not in COMS
            const mimeType = s3Version.DeleteMarker
              ? undefined // Will default to 'application/octet-stream'
              : await storageService.headObject({
                filePath: comsObject.path,
                s3VersionId: s3Version.VersionId,
                bucketId: comsObject.bucketId
              }).then(obj => obj.ContentType);

            const newVersion = await versionService.create({
              s3VersionId: s3Version.VersionId,
              mimeType: mimeType,
              id: comsObject.id,
              deleteMarker: s3Version.DeleteMarker,
              etag: s3Version.ETag,
              isLatest: s3Version.IsLatest,
              lastModifiedDate: s3Version.LastModified ? new Date(s3Version.LastModified).toISOString() : undefined
            }, userId, trx);
            // add to response with `newVersion` attribute, required for sync tags/meta logic
            return { modified: true, version: newVersion };
          }
        }
      }));

      if (!etrx) await trx.commit();
      return Promise.resolve(response);
    } catch (err) {
      if (!etrx && trx) await trx.rollback();
      throw err;
    }
  },

  /**
   * @function syncTags
   * Synchronizes Tag level data for a specific object version
   * @param {object | string} version The parent version or version uuid
   * @param {string} path String representing the canonical path for the specified object
   * @param {string} [bucketId=undefined] Optional COMS uuid of bucket
   * @param {string} [userId=SYSTEM_USER] The uuid of a user that created the sync job
   * @param {object} [etrx=undefined] An optional Objection Transaction object
   * returns array of synced versions
   * @returns {Promise<Array<object>>} An array of synced tags that exist in both COMS and S3
   */
  syncTags: async (version, path, bucketId = undefined, userId = SYSTEM_USER, etrx = undefined) => {
    let trx;
    try {
      trx = etrx ? etrx : await Version.startTransaction();
      let response = [];

      // Fetch COMS version record if necessary
      const comsVersion = typeof version === 'object' ? version : await versionService.get({ versionId: version }, trx);

      // Short circuit if version is a delete marker
      if (comsVersion.deleteMarker) return response;

      // Check for COMS and S3 Tag statuses
      const [comsTagsForVersion, s3TagsForVersion] = await Promise.allSettled([
        tagService.fetchTagsForVersion({ versionIds: comsVersion.id }, trx),
        storageService.getObjectTagging({ filePath: path, s3VersionId: comsVersion.s3VersionId, bucketId: bucketId })
      ]).then(settled => settled.map(promise => promise.value));

      // COMS Tags
      const comsTags = comsTagsForVersion[0]?.tagset ?? [];
      // S3 Tags
      const s3Tags = toLowerKeys(s3TagsForVersion?.TagSet ?? []);
      /**
       * Add coms-id tag to latest version in S3 if not already present
       * NOTE: For a sync job the _deriveObjectId() function will have already added
       * the coms-id to latest version.
       * TODO: check if this version is still also the latest on corresponding version in S3
       */
      if (comsVersion.isLatest && s3Tags.length < 10 && !s3Tags.find(s3T => s3T.key === 'coms-id')) {
        /**
         * NOTE: adding tags to a specified version (passing a `VersionId` parameter) will affect `Last Modified`
         * attribute of multiple versions on some s3 storage providors including Dell ECS
         */
        try {
          await storageService.putObjectTagging({
            filePath: path,
            tags: (s3TagsForVersion?.TagSet ?? []).concat([{
              Key: 'coms-id',
              Value: comsVersion.objectId
            }]),
            bucketId: bucketId,
            // s3VersionId: comsVersion.s3VersionId,
          });
          // add to our arrays for comaprison
          s3Tags.push({ key: 'coms-id', value: comsVersion.objectId });
        } catch (err) {
          log.warn(`Unable to add coms-id tag: ${err.message}`, { function: 'syncTags' });
        }
      }

      // Dissociate Tags not in S3
      const oldTags = [];
      for (const comsT of comsTags) {
        if (!s3Tags.some(s3T => s3T.key === comsT.key && s3T.value === comsT.value)) {
          oldTags.push(comsT);
        }
      }
      if (oldTags.length > 0) {
        await tagService.dissociateTags(comsVersion.id, oldTags, trx);
      }

      // Associate new S3 Tags
      const newTags = [];
      for (const s3Tag of s3Tags) {
        if (!comsTags.some(comsT => comsT.key === s3Tag.key && comsT.value === s3Tag.value)) {
          newTags.push(s3Tag);
        } else {
          response.push(s3Tag);
        }
      }
      if (newTags.length > 0) {
        await tagService.associateTags(comsVersion.id, newTags, userId, trx);
        response.push(...newTags);
      }

      if (!etrx) await trx.commit();
      return Promise.resolve(response);
    } catch (err) {
      if (!etrx && trx) await trx.rollback();
      throw err;
    }
  },

  /**
   * @function syncMetadata
   * Synchronizes Metadata level data for a specific object version
   * @param {object | string} version The parent version or version uuid
   * @param {string} path String representing the canonical path for the specified object
   * @param {string} [bucketId=undefined] Optional COMS uuid of bucket
   * @param {string} [userId=SYSTEM_USER] The uuid of a user that created the sync job
   * @param {object} [etrx=undefined] An optional Objection Transaction object
   * returns array of synced versions
   * @returns {Promise<Array<object>>} An array of synced metadata that exist in both COMS and S3
   */
  syncMetadata: async (version, path, bucketId = undefined, userId = SYSTEM_USER, etrx = undefined) => {
    let trx;
    try {
      trx = etrx ? etrx : await Version.startTransaction();
      let response = [];

      // Fetch COMS Object record if necessary
      const comsVersion = typeof version === 'object' ? version : await versionService.get({ versionId: version }, trx);

      // Short circuit if version is a delete marker
      if (comsVersion.deleteMarker) return response;

      // Check for COMS and S3 Metadata statuses
      const [comsMetadataForVersion, s3ObjectHead] = await Promise.allSettled([
        metadataService.fetchMetadataForVersion({ versionIds: comsVersion.id }, trx),
        storageService.headObject({ filePath: path, s3VersionId: comsVersion.s3VersionId, bucketId: bucketId })
      ]).then(settled => settled.map(promise => promise.value));

      // COMS Metadata
      const comsMetadata = comsMetadataForVersion[0]?.metadata ?? [];
      // S3 Metadata
      const s3Metadata = getKeyValue(s3ObjectHead?.Metadata ?? {});

      // Dissociate Metadata not in S3
      const oldMetadata = [];
      for (const comsM of comsMetadata) {
        if (!s3Metadata.some(s3M => s3M.key === comsM.key && s3M.value === comsM.value)) {
          oldMetadata.push(comsM);
        }
      }
      if (oldMetadata.length > 0) {
        await metadataService.dissociateMetadata(version.id, oldMetadata, trx);
      }

      // Associate new S3 Metadata
      const newMetadata = [];
      for (const s3M of s3Metadata) {
        if (!comsMetadata.some(comsM => comsM.key === s3M.key && comsM.value === s3M.value)) {
          newMetadata.push(s3M);
        } else {
          response.push(s3M);
        }
      }
      if (newMetadata.length > 0) {
        await metadataService.associateMetadata(version.id, newMetadata, userId, trx);
        response.push(...newMetadata);
      }

      if (!etrx) await trx.commit();
      return Promise.resolve(response);
    } catch (err) {
      if (!etrx && trx) await trx.rollback();
      throw err;
    }
  }
};

module.exports = service;