wkdhkr/dedupper

View on GitHub
src/services/amazon/ACDSyncService.js

Summary

Maintainability
C
1 day
Test Coverage
// @flow
import path from "path";
import pLimit from "p-limit";
import typeof { Logger } from "log4js";
import TagDbService from "../db/TagDbService";
import DbService from "../db/DbService";
import FileService from "../fs/FileService";
import SQLiteService from "../db/SQLiteService";
import ACDService from "./ACDService";
import ProcessStateDbService from "../db/ProcessStateDbService";
import type { ClassifyType } from "../../types/ClassifyTypes";
import type { Config, HashRow, ProcessStateRow } from "../../types";
import LockHelper from "../../helpers/LockHelper";
import { TYPE_IMAGE } from "../../types/ClassifyTypes";

export default class ACDSyncService {
  log: Logger;

  config: Config;

  ss: SQLiteService;

  psds: ProcessStateDbService;

  ds: DbService;

  tds: TagDbService;

  acds: ACDService;

  fs: FileService;

  constructor(config: Config) {
    this.log = config.getLogger(this);
    this.config = config;
    this.psds = new ProcessStateDbService(config);
    this.tds = new TagDbService(config);
    this.ds = new DbService(config);
    this.ss = new SQLiteService(config);
    this.fs = new FileService(config);
    this.acds = new ACDService(config);
  }

  lock: () => any = () => LockHelper.lockProcess();

  unlock: () => Promise<void> = () => LockHelper.unlockProcess();

  run: (processLimit: number, dbUnit?: number) => Promise<void> = async (
    processLimit: number,
    dbUnit: number = 1
  ) => {
    await this.lock();
    await this.psds.init();
    await this.unlock();
    let totalProcessCount = 0;
    while (totalProcessCount <= processLimit) {
      const fixedDbUnit = Math.min(processLimit, dbUnit);
      // eslint-disable-next-line no-await-in-loop
      const processCount = await this.searchAndGo(TYPE_IMAGE, fixedDbUnit);
      if (processCount === 0) {
        break;
      }
      totalProcessCount += processCount;
      this.log.info(
        `processing... count = ${totalProcessCount}, limit = ${processLimit}`
      );
    }
    this.log.info(`done.`);
  };

  isSameFile: (row: HashRow, acdFile: any) => boolean = (
    row: HashRow,
    acdFile: any
  ) => {
    if (acdFile.contentProperties && acdFile.contentProperties.image) {
      const { image } = acdFile.contentProperties;
      if (image.width === row.width && image.height === row.height) {
        return true;
      }
      if (image.width === row.height && image.height === row.width) {
        return true;
      }
    }
    return false;
  };

  detectFolderIdAndName: (toPath: string) => Promise<Array<string>> = async (
    toPath: string
  ) => {
    const acdPath = this.acds.convertLocalPath(toPath);
    const parsedPath = path.parse(acdPath);
    const acdFolderPath = parsedPath.dir;
    const folderId = await this.acds.prepareFolderPath(acdFolderPath);
    return [folderId, parsedPath.base];
  };

  async detectAcdUploadModeAndId(
    psRow: ProcessStateRow,
    row: HashRow
  ): Promise<// [null | "override" | "upload", null | string, null | string, null | string]
  any> {
    if (!row) {
      throw new Error(`file not found in db. hash = ${psRow.hash}`);
    }
    if (psRow.acd_id && (psRow.missing === -2 || psRow.missing === -3)) {
      this.log.debug(`detect acd_id, skip processing. hash = ${psRow.hash}`);
      return [null, null, psRow.acd_id, psRow.acd_md5];
    }
    const [folderId, baseName] = await this.detectFolderIdAndName(row.to_path);
    const [acdFile] = await this.acds.listSingleFile(folderId, baseName);
    if (!acdFile) {
      this.log.debug(`no acd_id, will be upload. hash = ${psRow.hash}`);
      return ["upload", folderId, null, null];
    }
    if (!(psRow.rating > 0) && this.isSameFile(row, acdFile)) {
      this.log.debug(`uploaded, record to db. hash = ${psRow.hash}`);
      return [null, folderId, acdFile.id, acdFile.contentProperties.md5];
    }
    this.log.debug(`different file. will be override. hash = ${psRow.hash}`);
    return ["override", folderId, acdFile.id, acdFile.contentProperties.md5];
  }

  uploadAcd: (row: HashRow, folderId: string) => Promise<null> = async (
    row: HashRow,
    folderId: string
  ) => {
    if (await this.fs.pathExists(row.to_path)) {
      return this.acds.upload(row.to_path, folderId);
    }
    return null;
  };

  overrideAcd: (row: HashRow, fileId: string) => Promise<null> = async (
    row: HashRow,
    fileId: string
  ) => {
    if (await this.fs.pathExists(row.to_path)) {
      return this.acds.override(row.to_path, fileId);
    }
    return null;
  };

  // eslint-disable-next-line complexity
  prepareUploadAcd: (
    psRow: ProcessStateRow,
    row: HashRow
    // eslint-disable-next-line complexity
  ) => Promise<ProcessStateRow> = async (
    psRow: ProcessStateRow,
    row: HashRow
  ) => {
    const res = await this.detectAcdUploadModeAndId(psRow, row);
    const [mode, folderId, fileId, md5] = res;
    this.log.debug(`mode = ${mode}, hash = ${row.hash}`);
    if (md5) {
      // eslint-disable-next-line no-param-reassign
      psRow.acd_md5 = md5;
    }
    if (fileId) {
      // eslint-disable-next-line no-param-reassign
      psRow.acd_id = fileId;
    }
    if (mode === null) {
      return psRow;
    }
    let fileAcdId: string | null = null;
    let fileAcdMd5: string | null = null;
    if (mode === "upload" && folderId) {
      const file = await this.uploadAcd(row, folderId);
      if (file) {
        fileAcdId = file.id;
        fileAcdMd5 = file.contentProperties.md5;
      }
    }
    if (mode === "override" && folderId && fileId) {
      const file = await this.overrideAcd(row, fileId);
      if (file) {
        fileAcdId = fileId;
        fileAcdMd5 = file.contentProperties.md5;
      }
    }
    if (fileAcdId) {
      // eslint-disable-next-line no-param-reassign
      psRow.acd_id = fileAcdId;
      if (fileAcdMd5) {
        // eslint-disable-next-line no-param-reassign
        psRow.acd_md5 = fileAcdMd5;
      }
      return psRow;
    }
    return psRow;
  };

  isUpdated: (
    missing: number,
    acdId: string,
    acdMd5: string,
    psRow: ProcessStateRow
  ) => boolean = (
    missing: number,
    acdId: string,
    acdMd5: string,
    psRow: ProcessStateRow
  ) => {
    if (missing !== psRow.missing) {
      return true;
    }
    if (acdId !== psRow.acd_id) {
      return true;
    }

    if (acdMd5 !== psRow.acd_md5) {
      return true;
    }

    return false;
  };

  // eslint-disable-next-line complexity
  processOne: (psRow: ?ProcessStateRow) => Promise<void> = async (
    psRow: ?ProcessStateRow
  ): Promise<void> => {
    try {
      if (!psRow) {
        return Promise.resolve();
      }
      const currentMissing = psRow.missing;
      const currentAcdId = psRow.acd_id;
      const currentAcdMd5 = psRow.acd_md5;

      const row = await this.ds.queryByHash(
        ({
          hash: psRow.hash,
          type: TYPE_IMAGE
        }: any)
      );
      if (!row) {
        return Promise.resolve();
      }
      const isExists = await this.fs.pathExists(row.to_path);
      // eslint-disable-next-line no-param-reassign
      psRow = await this.prepareUploadAcd(psRow, row);
      const isAcdUploaded = Boolean(psRow.acd_id);
      const isNeedless = await this.tds.isNeedless(row.hash);
      if (isAcdUploaded) {
        if (isNeedless) {
          // eslint-disable-next-line no-param-reassign
          psRow.missing = -3;
          await this.fs.delete(row.to_path);
        } else if (isExists) {
          // eslint-disable-next-line no-param-reassign
          psRow.missing = -2;
        } else {
          // eslint-disable-next-line no-param-reassign
          psRow.missing = -3;
        }
      } else if (!isExists) {
        // eslint-disable-next-line no-param-reassign
        psRow.missing = 2;
      } else {
        // eslint-disable-next-line no-param-reassign
        psRow.missing = -1;
      }
      if (this.isUpdated(currentMissing, currentAcdId, currentAcdMd5, psRow)) {
        await this.lock();
        try {
          return this.psds.insert(psRow);
        } catch (e) {
          this.log.warn(e);
        } finally {
          await this.unlock();
        }
      }
    } catch (e) {
      this.log.error(e);
    }
    return Promise.resolve();
  };

  searchAndGo(
    type: ClassifyType = TYPE_IMAGE,
    processLimit: number = 1
  ): Promise<number> {
    const limit = pLimit(parseInt(this.config.maxWorkers / 2, 10));
    return new Promise((resolve, reject) => {
      const db = this.ss.spawn<HashRow>(this.ss.detectDbFilePath(type));
      db.serialize(async () => {
        try {
          // await this.prepareTable(db);
          db.all(
            `select * from ${this.config.processStateDbName} ` +
              `where missing = -1 ` +
              `limit ${processLimit}`,
            {},
            (async (err, rows: ProcessStateRow[]) => {
              if (err) {
                reject(err);
              } else {
                await Promise.all(
                  rows.map(row => limit(() => this.processOne(row)))
                );
                resolve(rows.length);
              }
            }: any)
          );
        } catch (e) {
          reject(e);
        }
      });
    });
  }
}