src/services/ProcessService.js
// @flow
import maxListenersExceededWarning from "max-listeners-exceeded-warning";
import events from "events";
import typeof { Logger } from "log4js";
import pLimit from "p-limit";
import ProcessHelper from "../helpers/ProcessHelper";
import FileNameMarkHelper from "../helpers/FileNameMarkHelper";
import FileSystemHelper from "../helpers/FileSystemHelper";
import EnvironmentHelper from "../helpers/EnvironmentHelper";
import ReportHelper from "../helpers/ReportHelper";
import QueueHelper from "../helpers/QueueHelper";
import LoggerHelper from "../helpers/LoggerHelper";
import LockHelper from "../helpers/LockHelper";
import FileService from "./fs/FileService";
import AttributeService from "./fs/AttributeService";
import FastVideoCatalogerService from "./fs/fvc/FastVideoCatalogerService";
import DbService from "./db/DbService";
import DbRepairService from "./db/DbRepairService";
import {
TYPE_REPLACE,
TYPE_DELETE,
TYPE_SAVE,
TYPE_RELOCATE,
TYPE_TRANSFER,
TYPE_HOLD
} from "../types/ActionTypes";
import { STATE_DEDUPED } from "../types/FileStates";
import { TYPE_ARCHIVE_EXTRACT, TYPE_PROCESS_ERROR } from "../types/ReasonTypes";
import type { UserBaseConfig, Config, FileInfo } from "../types";
import type { JudgeResult, JudgeResultSimple } from "../types/JudgeResult";
import type { ActionType } from "../types/ActionTypes";
import type { ReasonType } from "../types/ReasonTypes";
import ExaminationService from "./ExaminationService";
import JudgmentService from "./judgment/JudgmentService";
export default class ProcessService {
log: Logger;
config: Config;
fileService: FileService;
judgmentService: JudgmentService;
examinationService: ExaminationService;
dbService: DbService;
dbRepairService: DbRepairService;
fvcs: FastVideoCatalogerService;
isParent: boolean;
constructor(config: Config, path: string, isParent: boolean = true) {
let { dryrun } = config;
if (EnvironmentHelper.isTest()) {
maxListenersExceededWarning();
dryrun = true;
}
let classifyTypeConfig: UserBaseConfig = {};
if (!FileSystemHelper.isDirectory(path)) {
const classifyType = AttributeService.detectClassifyTypeByConfig({
...config,
path
});
classifyTypeConfig = EnvironmentHelper.loadClassifyTypeConfig(
config.classifyTypeConfig,
classifyType
);
}
const pathMatchConfig = EnvironmentHelper.loadPathMatchConfig(
config.pathMatchConfig,
path
);
this.config = EnvironmentHelper.createConfig(
config,
pathMatchConfig,
classifyTypeConfig,
dryrun,
path
);
this.isParent = isParent;
this.log = this.config.getLogger(this);
this.fileService = new FileService(this.config);
this.judgmentService = new JudgmentService(this.config);
this.dbService = new DbService(this.config);
this.dbRepairService = new DbRepairService(this.config);
this.examinationService = new ExaminationService(
this.config,
this.fileService
);
this.fvcs = new FastVideoCatalogerService(this.config, this.fileService);
}
getResults: () => {
judge: Array<[ReasonType, string]>,
save: Array<string>,
...
} = (): { judge: [ReasonType, string][], save: string[] } => {
ReportHelper.sortResults();
return {
judge: ReportHelper.getJudgeResults(),
save: ReportHelper.getSaveResults()
};
};
async delete(fileInfo: FileInfo, [, , reason]: JudgeResult) {
const state = this.judgmentService.detectDeleteState(
reason
); /* ||
this.judgmentService.detectEraseState(reason); */
if (state) {
await this.insertToDb({
...fileInfo,
state
});
}
QueueHelper.appendOperationWaitPromise(this.fileService.delete());
}
throwError: (message: string) => empty = (message: string) => {
throw new Error(message);
};
async transfer(fileInfo: FileInfo, [, hitFile]: JudgeResult) {
if (!hitFile) {
this.throwError(
`try transfer, but transfer file missing. path = ${fileInfo.from_path}`
);
} else {
await this.fileService.delete(hitFile.to_path);
await this.save(fileInfo);
await this.insertToDb({
...DbService.rowToInfo(hitFile, fileInfo.type),
state: STATE_DEDUPED
});
}
}
async replace(fileInfo: FileInfo, [, hitFile]: JudgeResult) {
if (!hitFile) {
this.throwError(
[
"try replace, but replace file missing. path = ",
fileInfo.from_path
].join("")
);
} else {
await this.fileService.delete(hitFile.to_path);
await this.insertToDb({
...fileInfo,
to_path: await this.fileService.moveToLibrary(hitFile.to_path, true)
});
await this.insertToDb({
...DbService.rowToInfo(hitFile, fileInfo.type),
state: STATE_DEDUPED
});
ReportHelper.appendSaveResult(hitFile.to_path);
}
}
async save(
fileInfo: FileInfo,
isReplace: boolean = true,
result: ?JudgeResult = null
) {
if (result) {
const [, hitFile, reason] = result;
if (hitFile && this.judgmentService.isRecoveryReasonType(reason)) {
const toPath = await this.fileService.moveToLibrary(hitFile.to_path);
await this.dbService.insertProcessState(fileInfo);
ReportHelper.appendSaveResult(toPath);
return;
}
}
const toPath = await this.fileService.moveToLibrary();
await this.insertToDb({ ...fileInfo, to_path: toPath }, isReplace);
ReportHelper.appendSaveResult(toPath);
}
async relocate(fileInfo: FileInfo, [, hitFile]: JudgeResult) {
if (!hitFile) {
throw new Error(
`try relocate, but relocate file missing. path = ${fileInfo.from_path}`
);
}
const newToPath = await this.fileService.getFinalDestPath();
await this.insertToDb({
...fileInfo,
d_hash: hitFile.d_hash,
p_hash: hitFile.p_hash,
from_path: hitFile.from_path,
to_path: newToPath
});
await this.fileService.moveToLibrary(newToPath);
ReportHelper.appendSaveResult(newToPath);
}
async hold(
fileInfo: FileInfo,
reason: ReasonType,
results: JudgeResultSimple[]
) {
if (results.length === 0) {
await this.examinationService.rename(reason, fileInfo);
if (this.examinationService.detectMarksByReason(reason).size) {
QueueHelper.appendOperationWaitPromise(
this.examinationService.arrangeDir()
);
}
return;
}
await this.examinationService.arrange(results, fileInfo);
}
async insertToDb(fileInfo: FileInfo, isReplace: boolean = true) {
await this.dbService.insert(fileInfo, isReplace);
}
fillFileInfo: (
fileInfo: FileInfo,
action: ActionType,
reason: ReasonType
) => Promise<FileInfo> = async (
fileInfo: FileInfo,
action: ActionType,
reason: ReasonType
): Promise<FileInfo> => {
switch (action) {
case TYPE_DELETE:
if (this.judgmentService.detectDeleteState(reason)) {
return this.fileService.fillInsertFileInfo(fileInfo);
}
break;
case TYPE_HOLD:
case TYPE_RELOCATE:
break;
case TYPE_REPLACE:
case TYPE_SAVE:
case TYPE_TRANSFER:
return this.fileService.fillInsertFileInfo(fileInfo);
default:
break;
}
return fileInfo;
};
lockForSingleProcess: () => Promise<void> = async () => {
if (!EnvironmentHelper.isTest()) {
// this.log.info("start lock for single process");
// await LockHelper.lockProcess("app");
}
};
unlockForSingleProcess: () => Promise<void> = async () => {
if (!EnvironmentHelper.isTest()) {
// this.log.info("end lock for single process");
// await LockHelper.unlockProcess("app");
// await LockHelper.unlockProcess("gpu"); // for tensorflow.js
}
};
lockForWrite: () => Promise<void> = async () => {
/*
if (!this.config.pHashIgnoreSameDir) {
await LockHelper.lockProcess();
}
*/
await LockHelper.lockProcess();
};
unlockForWrite: () => Promise<void> = async () => {
/*
if (!this.config.pHashIgnoreSameDir) {
await LockHelper.unlockProcess();
}
*/
await LockHelper.unlockProcess();
};
lockForRead: (fileInfo: FileInfo) => Promise<void> = async (
fileInfo: FileInfo
) => {
await LockHelper.lockKey(fileInfo.hash);
await LockHelper.lockKey(fileInfo.to_path);
};
unlockForRead: (fileInfo: FileInfo) => Promise<void> = async (
fileInfo: FileInfo
) => {
LockHelper.unlockKey(fileInfo.hash);
LockHelper.unlockKey(fileInfo.to_path);
};
fixProcessAction(action: ActionType): ActionType {
if (action === TYPE_REPLACE && this.config.forceTransfer) {
return TYPE_TRANSFER;
}
return action;
}
async processFixedAction(
fixedAction: ActionType,
filledInfo: FileInfo,
result: JudgeResult,
reason: ReasonType,
results: JudgeResultSimple[]
) {
switch (fixedAction) {
case TYPE_DELETE:
await this.delete(filledInfo, result);
break;
case TYPE_REPLACE:
await this.replace(filledInfo, result);
break;
case TYPE_SAVE:
await this.save(filledInfo, true, result);
break;
case TYPE_TRANSFER:
await this.transfer(filledInfo, result);
break;
case TYPE_RELOCATE: {
await this.relocate(filledInfo, result);
break;
}
case TYPE_HOLD:
await this.hold(filledInfo, reason, results);
break;
default:
}
}
async processAction(
fileInfo: FileInfo,
result: JudgeResult
): Promise<boolean> {
const [action, , reason, results] = result;
if (
this.config.sweep &&
this.judgmentService.isSweepReasonType(reason) === false
) {
return true;
}
const fixedAction = this.fixProcessAction(action);
try {
await this.lockForWrite();
const filledInfo = await this.fillFileInfo(fileInfo, fixedAction, reason);
await this.processFixedAction(
fixedAction,
filledInfo,
result,
reason,
results
);
ReportHelper.appendJudgeResult(reason, filledInfo.from_path);
return true;
// eslint-disable-next-line no-useless-catch
} catch (e) {
throw e;
} finally {
await this.unlockForWrite();
await this.fileService.cleanCacheFile(
undefined,
Boolean(this.config.manual)
);
}
}
async process(): Promise<boolean> {
let result;
if (await this.fileService.isDirectory()) {
result = (await this.processDirectory()).every(Boolean);
} else {
result = await this.processFile();
}
await QueueHelper.waitOperationWaitPromises();
if (!result) {
if (this.config.path) {
ReportHelper.appendJudgeResult(TYPE_PROCESS_ERROR, this.config.path);
}
}
if (this.isParent) {
if (this.config.report) {
await ReportHelper.render(this.config.path || "");
}
await LoggerHelper.flush();
}
return result;
}
async processFiles(filePaths: string[]): Promise<boolean[]> {
const limit = pLimit(this.config.maxWorkers);
const eventEmitter = new events.EventEmitter();
eventEmitter.setMaxListeners(
eventEmitter.getMaxListeners() * this.config.maxWorkers
);
const results = await Promise.all(
filePaths.map(f =>
limit(async () => {
const ps = new ProcessService(this.config, f, false);
return ps.process();
})
)
);
await QueueHelper.waitOperationWaitPromises();
await this.fileService.deleteEmptyDirectory();
return results;
}
async processDirectory(): Promise<boolean[]> {
const filePaths = await this.fileService.collectFilePaths();
return this.processFiles(filePaths);
}
async processArchive(): Promise<boolean> {
if (this.config.archiveExtract && (await this.fileService.isArchive())) {
QueueHelper.appendOperationWaitPromise(this.fileService.extractArchive());
ReportHelper.appendJudgeResult(
TYPE_ARCHIVE_EXTRACT,
this.fileService.getSourcePath()
);
return true;
}
return false;
}
async processImportedFile(): Promise<boolean> {
const sourcePath = this.fileService.getSourcePath();
const toPath = FileNameMarkHelper.strip(sourcePath);
const type = AttributeService.detectClassifyTypeByConfig(this.config);
if (this.fileService.isLibraryPlace(toPath) === false) {
return false;
}
const hitRows = await this.dbService.queryByToPath({
type,
to_path: toPath
});
const acceptedRows = hitRows.filter(({ state }) =>
DbService.isAcceptedState(state)
);
if (acceptedRows.length) {
this.log.debug(`imported file. path = ${toPath}`);
const marks = FileNameMarkHelper.extract(sourcePath);
if (marks.size) {
await this.processRegularFile({
...DbService.rowToInfo(acceptedRows.pop()),
type,
from_path: sourcePath
});
} else {
await this.dbRepairService.fillDeepLearningTable(acceptedRows);
}
return true;
}
return false;
}
async processIrregularFile(): Promise<boolean> {
if (await this.fileService.isDeadLink()) {
await this.fileService.unlink();
return true;
}
if (await this.processImportedFile()) {
return true;
}
if (await this.processArchive()) {
return true;
}
if (await this.processFastVideoCataloger()) {
return true;
}
return false;
}
async processFastVideoCataloger(): Promise<boolean> {
if (this.fvcs.isFastVideoCatalogerExportedFile()) {
(await this.processFiles(await this.fvcs.collectBlockFiles())).every(
Boolean
);
return true;
}
return false;
}
async processRegularFile(preferFileInfo?: FileInfo): Promise<boolean> {
const fileInfo =
preferFileInfo || (await this.fileService.collectFileInfo());
try {
await this.lockForRead(fileInfo);
const isForgetType = this.judgmentService.isForgetType(fileInfo.type);
await this.fileService.prepareDir(this.config.dbBasePath, true);
const [
storedFileInfoByHash,
storedFileInfoByPHashs,
storedFileInfoByNames
] = await Promise.all(
isForgetType
? [null, [], []]
: [
this.dbService
.queryByHash(fileInfo)
.then(storedFileInfo => storedFileInfo),
this.config.pHash ? this.dbService.queryByPHash(fileInfo) : [],
this.config.useFileName &&
this.judgmentService.isWhiteListName(fileInfo.name) === false
? this.dbService.queryByName(fileInfo)
: []
]
);
return this.processAction(
...(await Promise.all([
fileInfo,
this.judgmentService.detect(
fileInfo,
storedFileInfoByHash,
storedFileInfoByPHashs,
storedFileInfoByNames
)
]))
);
// eslint-disable-next-line no-useless-catch
} catch (e) {
throw e;
} finally {
await this.unlockForRead(fileInfo);
}
}
async processFile(): Promise<boolean> {
if (QueueHelper.operationWaitPromises.length > 100) {
await QueueHelper.waitOperationWaitPromises();
}
await ProcessHelper.waitCpuIdle(this.config.maxCpuLoadPercent);
await QueueHelper.waitOperationWaitPromises();
try {
if (await this.processIrregularFile()) {
return true;
}
return this.processRegularFile();
} catch (e) {
if (EnvironmentHelper.isTest()) {
console.log(e);
}
this.log.fatal(e);
if (e.stack) {
this.log.fatal(e.stack);
}
return false;
}
}
}