NatLibFi/Annif

View on GitHub
annif/cli_util.py

Summary

Maintainability
A
50 mins
Test Coverage
"""Utility functions for Annif CLI commands"""

from __future__ import annotations

import collections
import itertools
import os
import sys
from typing import TYPE_CHECKING

import click
import click_log
from flask import current_app

import annif
from annif.exception import ConfigurationException
from annif.project import Access

if TYPE_CHECKING:
    import io
    from datetime import datetime

    from click.core import Argument, Context, Option

    from annif.corpus.document import DocumentCorpus, DocumentList
    from annif.corpus.subject import SubjectIndex
    from annif.project import AnnifProject
    from annif.suggestion import SuggestionResult
    from annif.vocab import AnnifVocabulary

logger = annif.logger


def _set_project_config_file_path(
    ctx: Context, param: Option, value: str | None
) -> None:
    """Override the default path or the path given in env by CLI option"""
    with ctx.obj.load_app().app_context():
        if value:
            current_app.config["PROJECTS_CONFIG_PATH"] = value


def common_options(f):
    """Decorator to add common options for all CLI commands"""
    f = click.option(
        "-p",
        "--projects",
        help="Set path to project configuration file or directory",
        type=click.Path(dir_okay=True, exists=True),
        callback=_set_project_config_file_path,
        expose_value=False,
        is_eager=True,
    )(f)
    return click_log.simple_verbosity_option(logger)(f)


def project_id(f):
    """Decorator to add a project ID parameter to a CLI command"""
    return click.argument("project_id", shell_complete=complete_param)(f)


def backend_param_option(f):
    """Decorator to add an option for CLI commands to override BE parameters"""
    return click.option(
        "--backend-param",
        "-b",
        multiple=True,
        help="Override backend parameter of the config file. "
        + "Syntax: `-b <backend>.<parameter>=<value>`.",
    )(f)


def docs_limit_option(f):
    """Decorator to add an option for CLI commands to limit the number of documents to
    use"""
    return click.option(
        "--docs-limit",
        "-d",
        default=None,
        type=click.IntRange(0, None),
        help="Maximum number of documents to use",
    )(f)


def get_project(project_id: str) -> AnnifProject:
    """
    Helper function to get a project by ID and bail out if it doesn't exist"""
    try:
        return annif.registry.get_project(project_id, min_access=Access.private)
    except ValueError:
        click.echo("No projects found with id '{0}'.".format(project_id), err=True)
        sys.exit(1)


def get_vocab(vocab_id: str) -> AnnifVocabulary:
    """
    Helper function to get a vocabulary by ID and bail out if it doesn't
    exist"""
    try:
        return annif.registry.get_vocab(vocab_id, min_access=Access.private)
    except ValueError:
        click.echo(f"No vocabularies found with the id '{vocab_id}'.", err=True)
        sys.exit(1)


def make_list_template(*rows) -> str:
    """Helper function to create a template for a list of entries with fields of
    variable width. The width of each field is determined by the longest item in the
    field in the given rows."""

    max_field_widths = collections.defaultdict(int)
    for row in rows:
        for field_ind, item in enumerate(row):
            max_field_widths[field_ind] = max(max_field_widths[field_ind], len(item))

    return "  ".join(
        [
            f"{{{field_ind}: <{field_width}}}"
            for field_ind, field_width in max_field_widths.items()
        ]
    )


def format_datetime(dt: datetime | None) -> str:
    """Helper function to format a datetime object as a string in the local time."""
    if dt is None:
        return "-"
    return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")


def open_documents(
    paths: tuple[str, ...],
    subject_index: SubjectIndex,
    vocab_lang: str,
    docs_limit: int | None,
) -> DocumentCorpus:
    """Helper function to open a document corpus from a list of pathnames,
    each of which is either a TSV file or a directory of TXT files. For
    directories with subjects in TSV files, the given vocabulary language
    will be used to convert subject labels into URIs. The corpus will be
    returned as an instance of DocumentCorpus or LimitingDocumentCorpus."""

    def open_doc_path(path, subject_index):
        """open a single path and return it as a DocumentCorpus"""
        if os.path.isdir(path):
            return annif.corpus.DocumentDirectory(
                path, subject_index, vocab_lang, require_subjects=True
            )
        return annif.corpus.DocumentFile(path, subject_index)

    if len(paths) == 0:
        logger.warning("Reading empty file")
        docs = open_doc_path(os.path.devnull, subject_index)
    elif len(paths) == 1:
        docs = open_doc_path(paths[0], subject_index)
    else:
        corpora = [open_doc_path(path, subject_index) for path in paths]
        docs = annif.corpus.CombinedCorpus(corpora)
    if docs_limit is not None:
        docs = annif.corpus.LimitingDocumentCorpus(docs, docs_limit)
    return docs


def open_text_documents(paths: tuple[str, ...], docs_limit: int | None) -> DocumentList:
    """
    Helper function to read text documents from the given file paths. Returns a
    DocumentList object with Documents having no subjects. If a path is "-", the
    document text is read from standard input. The maximum number of documents to read
    is set by docs_limit parameter.
    """

    def _docs(paths):
        for path in paths:
            if path == "-":
                doc = annif.corpus.Document(text=sys.stdin.read(), subject_set=None)
            else:
                with open(path, errors="replace", encoding="utf-8-sig") as docfile:
                    doc = annif.corpus.Document(text=docfile.read(), subject_set=None)
            yield doc

    return annif.corpus.DocumentList(_docs(paths[:docs_limit]))


def show_hits(
    hits: SuggestionResult,
    project: AnnifProject,
    lang: str,
    file: io.TextIOWrapper | None = None,
) -> None:
    """
    Print subject suggestions to the console or a file. The suggestions are displayed as
    a table, with one row per hit. Each row contains the URI, label, possible notation,
    and score of the suggestion. The label is given in the specified language.
    """
    template = "<{}>\t{}\t{:.04f}"
    for hit in hits:
        subj = project.subjects[hit.subject_id]
        line = template.format(
            subj.uri,
            "\t".join(filter(None, (subj.labels[lang], subj.notation))),
            hit.score,
        )
        click.echo(line, file=file)


def parse_backend_params(
    backend_param: tuple[str, ...] | tuple[()], project: AnnifProject
) -> collections.defaultdict[str, dict[str, str]]:
    """Parse a list of backend parameters given with the --backend-param
    option into a nested dict structure"""
    backend_params = collections.defaultdict(dict)
    for beparam in backend_param:
        backend, param = beparam.split(".", 1)
        key, val = param.split("=", 1)
        _validate_backend_params(backend, beparam, project)
        backend_params[backend][key] = val
    return backend_params


def _validate_backend_params(backend: str, beparam: str, project: AnnifProject) -> None:
    if backend != project.config["backend"]:
        raise ConfigurationException(
            'The backend {} in CLI option "-b {}" not matching the project'
            " backend {}.".format(backend, beparam, project.config["backend"])
        )


def generate_filter_params(filter_batch_max_limit: int) -> list[tuple[int, float]]:
    limits = range(1, filter_batch_max_limit + 1)
    thresholds = [i * 0.05 for i in range(20)]
    return list(itertools.product(limits, thresholds))


def _get_completion_choices(
    param: Argument,
) -> dict[str, AnnifVocabulary] | dict[str, AnnifProject] | list:
    if param.name in ("project_id", "project_ids_pattern"):
        return annif.registry.get_projects()
    elif param.name == "vocab_id":
        return annif.registry.get_vocabs()
    else:
        return []


def complete_param(ctx: Context, param: Argument, incomplete: str) -> list[str]:
    with ctx.obj.load_app().app_context():
        return [
            choice
            for choice in _get_completion_choices(param)
            if choice.startswith(incomplete)
        ]