huridocs/uwazi

View on GitHub
app/api/topicclassification/sync.ts

Summary

Maintainability
B
6 hrs
Test Coverage
B
82%
import entities from 'api/entities';
import { EnforcedWithId, QueryForEach } from 'api/odm';
import templates from 'api/templates';
import thesauri from 'api/thesauri';
import { extractSequence, listModels } from 'api/topicclassification';
import { getModel, fetchSuggestions } from 'api/topicclassification/api';
import { buildFullModelName } from 'shared/commonTopicClassification';
import { provenanceTypes } from 'shared/provenanceTypes';
import { Task, TaskProvider } from 'shared/tasks/tasks';
import { PropertySchema } from 'shared/types/commonTypes';
import { EntitySchema } from 'shared/types/entityType';
import { TemplateSchema } from 'shared/types/templateType';
import { ThesaurusSchema } from 'shared/types/thesaurusType';
import * as util from 'util';
import { ensure } from 'shared/tsUtils';

interface SyncArgs {
  limit?: number;
  mode: 'onlynew' | 'autoaccept';
  model?: string;
  noDryRun?: boolean;
  overwrite?: boolean;
  autoAcceptConfidence?: number;
  batchSize?: number;
}

// eslint-disable-next-line max-params, max-statements
async function handlePropOnlynew(
  e: EntitySchema,
  args: SyncArgs,
  prop: PropertySchema,
  thes: ThesaurusSchema,
  latestModelVersion?: string
): Promise<boolean> {
  const seq = await extractSequence(e);
  if (e.metadata![prop.name!] && (e.metadata![prop.name!] ?? []).length) {
    return false;
  }
  if (!args.overwrite && e.suggestedMetadata![prop.name!]) {
    return false;
  }
  if (latestModelVersion) {
    const currentModel = e
      .suggestedMetadata![prop.name!]?.map(v => v.suggestion_model)
      .reduce((max, s) => ((s || '') > max! ? s : max), '');
    if (currentModel && currentModel >= latestModelVersion) {
      return false;
    }
  }
  const suggestions = await fetchSuggestions(e, prop, seq, thes);
  if (
    !suggestions ||
    JSON.stringify(suggestions) === JSON.stringify(e.suggestedMetadata![prop.name!])
  ) {
    return false;
  }
  e.suggestedMetadata![prop.name!] = suggestions;
  return true;
}

// eslint-disable-next-line max-params, max-statements
async function handlePropAutoaccept(
  e: EntitySchema,
  args: SyncArgs,
  prop: PropertySchema,
  thes: ThesaurusSchema,
  latestModelVersion?: string
): Promise<boolean> {
  const seq = await extractSequence(e);
  if (e.metadata![prop.name!] && (e.metadata![prop.name!] ?? []).length) {
    if (!e.metadata![prop.name!]!.every(v => v.provenance === provenanceTypes.bulk)) {
      return false;
    }
    if (!args.overwrite) {
      return false;
    }
    if (latestModelVersion) {
      const currentModel = e
        .metadata![prop.name!]?.map(v => v.suggestion_model)
        .reduce((max, s) => ((s || '') > max! ? s : max), '');
      if (currentModel && currentModel >= latestModelVersion) {
        return false;
      }
    }
  }
  const suggestions = await fetchSuggestions(e, prop, seq, thes);
  if (!suggestions) {
    return false;
  }
  const toApply = suggestions
    .filter(s => (s.suggestion_confidence ?? 0) >= (args.autoAcceptConfidence ?? 0))
    .map(s => ({ ...s, provenance: provenanceTypes.bulk }));
  if (JSON.stringify(toApply) === JSON.stringify(e.metadata![prop.name!])) {
    return false;
  }
  e.metadata![prop.name!] = toApply;
  return true;
}

async function syncEntity(
  e: EntitySchema,
  args: SyncArgs,
  templateDictP?: { [k: string]: TemplateSchema },
  thesaurusDictP?: { [k: string]: ThesaurusSchema },
  availableModels?: { [k: string]: string | undefined }
): Promise<boolean> {
  if (!e.metadata) {
    e.metadata = {};
  }
  if (!e.suggestedMetadata) {
    e.suggestedMetadata = {};
  }
  const template: TemplateSchema | undefined =
    (templateDictP ?? {})[e.template?.toString() ?? ''] ??
    (await templates.getById(ensure<string>(e.template)));
  const thesaurusDict =
    thesaurusDictP ??
    (await thesauri.get(null)).reduce<Record<string, EnforcedWithId<ThesaurusSchema>>>(
      (res, t) => ({ ...res, [t._id.toString()]: t }),
      {}
    );
  let didSth = false;
  await Promise.all(
    (template?.properties ?? []).map(async prop => {
      const thesaurus = thesaurusDict[prop?.content ?? ''];
      if (!prop || !thesaurus) {
        return;
      }
      const modelName = buildFullModelName(thesaurus.name);
      if (availableModels !== undefined && !availableModels[modelName]) {
        return;
      }
      if (args.mode === 'onlynew') {
        didSth =
          didSth ||
          (await handlePropOnlynew(e, args, prop, thesaurus, (availableModels ?? {})[modelName]));
      } else if (args.mode === 'autoaccept') {
        didSth =
          didSth ||
          (await handlePropAutoaccept(
            e,
            args,
            prop,
            thesaurus,
            (availableModels ?? {})[modelName]
          ));
      }
    })
  );
  return didSth;
}

async function getAvailableModels(fixedModel?: string) {
  const models = (await listModels()) ?? { error: 'Internal error in calling backend.' };
  if (models.error) {
    return { error: `Suggestion sync aborted: ${models.error}` };
  }
  return models.models.reduce(
    async (res, m) => {
      if (fixedModel && m !== fixedModel) {
        return res;
      }
      const model = await getModel(m);
      if (model && model.preferred) {
        return { ...(await res), [m]: model.preferred };
      }
      return res;
    },
    Promise.resolve({} as { [k: string]: string | undefined; error?: string })
  );
}

class SyncTask extends Task {
  // eslint-disable-next-line max-statements
  protected async run(args: SyncArgs) {
    this.status.message = `Started with ${util.inspect(args)}`;
    const models = await getAvailableModels(args.model);
    if (models.error) {
      this.status.message = models.error;
      return;
    }
    const templatesDict = (await templates.get(null)).reduce(
      (res, t) => ({ ...res, [t._id.toString()]: t }),
      {}
    );
    const thesaurusDict = (await thesauri.get(null)).reduce(
      (res, t) => ({ ...res, [t._id.toString()]: t }),
      {} as { [k: string]: ThesaurusSchema }
    );
    if (!Object.keys(models).length) {
      if (args.model) {
        throw new Error(`The selected model ${args.model} was not found!`);
      }
      this.status.message =
        'No suggestions to sync: Topic Classification server does not have any models (yet).';
      return;
    }
    const res = this.status.result;
    res.total = await entities.count({ language: 'en' });
    res.seen = 0;
    res.index = 0;
    await QueryForEach(args.batchSize || 50, async e => {
      if (res.index > (args.limit ?? 1000000)) {
        return;
      }
      res.seen += 1;
      if (await syncEntity(e, args, templatesDict, thesaurusDict, models)) {
        res.index += 1;
        if (args.noDryRun) {
          await entities.save(e, { user: 'sync-topic-classification', language: e.language });
        }
      }
      this.status.message =
        `Updating suggestions in the background: ${res.seen} of ${res.total} documents processed, ` +
        `${res.index} changed. Sync arguments are ${util.inspect(args)}.`;
    });
  }
}

TaskProvider.registerClass('TopicClassificationSync', SyncTask);

export { syncEntity };
export type { SyncArgs };