bel/resources/namespace.py

Summary

Maintainability
C
1 day
Test Coverage
# Standard Library
import copy
import gzip
import json
import time
from collections import defaultdict
from typing import IO, Optional

# Third Party
import cachetools
from arango import ArangoError
from loguru import logger

# Local
import bel.core.mail
import bel.core.settings as settings
import bel.db.elasticsearch as elasticsearch
from bel.db.arangodb import (
    arango_id_to_key,
    batch_load_docs,
    equiv_edges_name,
    equiv_nodes_name,
    resources_db,
    resources_metadata_coll,
    terms_coll,
    terms_coll_name,
)
from bel.db.elasticsearch import es
from bel.schemas.terms import Namespace

# key = ns:id
# main_key = preferred key, e.g. ns:<primary_id> not the alt_key or obsolete key or even an equivalence key which could be an alt_key
# alt_key = for HGNC the preferred key is HGNC:391 for the alt_key HGNC:AKT1 - where AKT1 is a secondary ID
# db_key = key converted to arangodb format


namespace_metadata_cache = cachetools.TTLCache(maxsize=1, ttl=600)
bel_resource_metadata_cache = cachetools.TTLCache(maxsize=1, ttl=600)


def remove_old_db_entries(namespace: str, version: str = "", force: bool = False):
    """Remove old database entries

    Args:
        namespace: preferred namespace prefix, e.g. HGNC or DO
        version: version of last namespace loaded - used to remove older entries from arangodb
        force: remove ALL namespace database entries
    """

    if force or version == "":
        filter_version = ""
    else:
        filter_version = f"""FILTER doc.version != "{version}" """

    # Clean up old entries
    remove_old_terms = f"""
        FOR doc in {terms_coll_name}
            FILTER doc.namespace == "{namespace}"
            {filter_version}
            REMOVE doc IN {terms_coll_name}
    """

    remove_old_equivalence_edges = f"""
        FOR doc in {equiv_edges_name}
            FILTER doc.source == "{namespace}"
            {filter_version}
            REMOVE doc IN {equiv_edges_name}
    """

    remove_old_equivalence_nodes = f"""
        FOR doc in {equiv_nodes_name}
            FILTER doc.source == "{namespace}"
            {filter_version}
            REMOVE doc IN {equiv_nodes_name}
    """

    resources_db.aql.execute(remove_old_terms, ttl=7200)
    resources_db.aql.execute(remove_old_equivalence_edges, ttl=7200)
    resources_db.aql.execute(remove_old_equivalence_nodes, ttl=7200)


def load_terms(
    f: IO, metadata: dict, force: bool = False, resource_download_url: Optional[str] = None
):
    """Load terms into Elasticsearch and ArangoDB

    Force will create a new index in Elasticsearch regardless of whether
    an index with the resource version already exists.

    Args:
        fp: file path - terminology file
        metadata: dict containing the metadata for terminology
        force:  force full update - e.g. remove and re-add elasticsearch index
                and delete arangodb namespace records before loading
    """

    result = {"state": "Succeeded", "messages": []}

    metadata["statistics"] = {
        "entities_count": 0,
        "synonyms_count": 0,
        "entity_types": defaultdict(int),
        "annotation_types": defaultdict(int),
        "equivalenced_namespaces": defaultdict(int),
    }

    metadata_key = f"Namespace_{metadata['namespace']}"
    prior_metadata = resources_metadata_coll.get(metadata_key)

    try:
        prior_version = prior_metadata.get("version", "")
        prior_entity_count = prior_metadata["statistics"].get("entities_count", 0)
    except Exception:
        prior_entity_count = 0
        prior_version = ""

    namespace = metadata["namespace"]
    version = metadata["version"]
    es_version = version.replace("T", "").replace("-", "").replace(":", "")
    index_prefix = f"{settings.TERMS_INDEX}_{namespace.lower()}"
    index_name = f"{index_prefix}_{es_version}"

    ################################################################################
    # Elasticsearch index processing
    ################################################################################
    # Create index with mapping
    if force or prior_version != version:
        elasticsearch.create_terms_index(index_name)
    else:
        result["state"] = "Succeeded"
        result["messages"].append(
            f'NOTE: This namespace {namespace} at version {version} is already loaded and the "force" option was not used'
        )

        return result

    # Using side effect to get statistics from terms_iterator_for_elasticsearch on purpose
    terms_iterator = terms_iterator_for_elasticsearch(f, index_name, metadata)
    elasticsearch.bulk_load_docs(terms_iterator)

    # Remove old namespace index
    index_names = elasticsearch.get_all_index_names()
    for name in index_names:
        if name != index_name and index_prefix in name:
            elasticsearch.delete_index(name)

    if not force and prior_entity_count > metadata["statistics"]["entities_count"]:
        logger.error(
            f'Problem loading namespace: {namespace}, previous entity count: {prior_entity_count}, current load entity count: {metadata["statistics"]["entities_count"]}'
        )

        result["state"] = "Failed"
        result["messages"].append(
            f'ERROR: Problem loading namespace: {namespace}, previous entity count: {prior_entity_count}, current load entity count: {metadata["statistics"]["entities_count"]}'
        )
        return result

    elif force and prior_entity_count > metadata["statistics"]["entities_count"]:
        result["state"] = "Warning"
        result["messages"].append(
            f'WARNING: New namespace: {namespace} is smaller, previous entity count: {prior_entity_count}, current load entity count: {metadata["statistics"]["entities_count"]}'
        )

    # Add terms alias to this index
    elasticsearch.add_index_alias(index_name, settings.TERMS_INDEX)

    ################################################################################
    # Arangodb collection loading
    ################################################################################
    if force:
        remove_old_db_entries(namespace, version=version, force=True)

    # LOAD Terms and equivalences INTO ArangoDB
    # Uses update on duplicate to allow primary on equivalence_nodes to not be overwritten
    batch_load_docs(resources_db, terms_iterator_for_arangodb(f, version), on_duplicate="update")

    # Add metadata to resource metadata collection
    metadata["_key"] = metadata_key

    if resource_download_url is not None:
        metadata["resource_download_url"] = resource_download_url

    resources_metadata_coll.insert(metadata, overwrite=True)
    clear_resource_metadata_cache()

    if not force:
        remove_old_db_entries(namespace, version=version)

    logger.info(
        f'Loaded Namespace: {namespace} with {metadata["statistics"]["entities_count"]} terms into elasticsearch: {settings.TERMS_INDEX}.{index_name} and arangodb collection: {terms_coll_name}',
        namespace=metadata["namespace"],
    )

    result["messages"].append(
        f'Loaded Namespace: {namespace} with {metadata["statistics"]["entities_count"]} terms into elasticsearch: {settings.TERMS_INDEX}.{index_name} and arangodb collection: {terms_coll_name}'
    )
    return result


def terms_iterator_for_arangodb(f: IO, version: str):
    """Generator for loading namespace terms into arangodb"""

    species_list = settings.BEL_FILTER_SPECIES

    f.seek(0)

    for line in f:
        term = json.loads(line)
        # skip if not term record (e.g. is a metadata record)
        if "term" not in term:
            continue
        term = term["term"]
        term_key = term["key"]
        namespace = term["namespace"]

        # Skip if species not listed in config species_list
        species_key = term.get("species_key", None)
        if species_list and species_key and species_key not in species_list:
            continue

        # Can't use original key formatted for Arangodb as some keys are longer than allowed (_key < 255 chars)
        term_db_key = arango_id_to_key(term_key)

        term["_key"] = term_db_key
        term["version"] = version

        # Add term record to terms collection
        yield (terms_coll_name, term)

        # Add primary ID node
        yield (
            equiv_nodes_name,
            {
                "_key": term_db_key,
                "key": term["key"],  # BEL Key - ns:id
                "primary": True,
                "namespace": namespace,
                "source": namespace,
                "version": version,
            },
        )

        # Create Alt ID nodes/equivalences (to support other database equivalences using non-preferred Namespace IDs)
        if "alt_keys" in term:
            for alt_key in term["alt_keys"]:
                # logger.info(f'Added {alt_id} equivalence')
                alt_db_key = arango_id_to_key(alt_key)

                yield (
                    equiv_nodes_name,
                    {
                        "_key": alt_db_key,
                        "key": alt_key,
                        "namespace": alt_key.split(":", 1)[0],
                        "source": namespace,
                        "version": version,
                    },
                )

                # Ensure only one edge per pair
                if term_db_key < alt_db_key:
                    from_ = term_db_key
                    to_ = alt_db_key
                else:
                    from_ = alt_db_key
                    to_ = term_db_key

                # Add edges for alt_keys
                arango_edge = {
                    "_from": f"{equiv_nodes_name}/{from_}",
                    "_to": f"{equiv_nodes_name}/{to_}",
                    "_key": arango_id_to_key(f"{from_}>>{to_}"),
                    "type": "equivalent_to",
                    "source": namespace,
                    "version": version,
                }
                yield (equiv_edges_name, arango_edge)

        # Cross-Namespace equivalences
        if "equivalence_keys" in term:
            for eqv_key in term["equivalence_keys"]:
                eqv_db_key = arango_id_to_key(eqv_key)

                equiv_node = (
                    equiv_nodes_name,
                    {
                        "_key": eqv_db_key,
                        "key": eqv_key,
                        "namespace": eqv_key.split(":", 1)[0],
                        "source": namespace,
                        "version": version,
                    },
                )

                yield equiv_node

                # Ensure only one edge per pair
                if term_db_key < eqv_db_key:
                    from_ = term_db_key
                    to_ = eqv_db_key
                else:
                    from_ = eqv_db_key
                    to_ = term_db_key

                equiv_edge = (
                    equiv_edges_name,
                    {
                        "_from": f"{equiv_nodes_name}/{from_}",
                        "_to": f"{equiv_nodes_name}/{to_}",
                        "_key": arango_id_to_key(f"{from_}>>{to_}"),
                        "type": "equivalent_to",
                        "source": namespace,
                        "version": version,
                    },
                )

                yield equiv_edge


def terms_iterator_for_elasticsearch(f: IO, index_name: str, metadata: dict):
    """Add index_name to term documents for bulk load"""

    species_list = settings.BEL_FILTER_SPECIES

    f.seek(0)  # Seek back to beginning of file

    for line in f:
        term = json.loads(line)
        # skip if not term record (e.g. is a metadata record)
        if "term" not in term:
            continue
        term = term["term"]

        # Collect statistics
        metadata["statistics"]["entities_count"] += 1
        metadata["statistics"]["synonyms_count"] += len(term.get("synonyms", []))
        for entity_type in term.get("entity_types", []):
            metadata["statistics"]["entity_types"][entity_type] += 1
        for annotation_type in term.get("annotation_types", []):
            metadata["statistics"]["annotation_types"][annotation_type] += 1
        for equivalence in term.get("equivalence_keys", []):
            ns, id_ = equivalence.split(":", 1)
            metadata["statistics"]["equivalenced_namespaces"][ns] += 1

        # Skip if species not listed in config species_list
        species_key = term.get("species_key", None)
        if species_list and species_key and species_key not in species_list:
            continue

        all_term_keys = set()
        for term_key in [term["key"]] + term.get("alt_keys", []):
            all_term_keys.add(term_key)
            all_term_keys.add(lowercase_term_id(term_key))

        term["alt_keys"] = list(all_term_keys)

        term.pop("child_keys", "")
        term.pop("parent_keys", "")
        term.pop("equivalence_keys", "")

        # Must not have species_key attribute to allow naked NSArg queries with filtered species
        #    but allow non-species terms to be matched as well
        if term.get("species_key", "") == "":
            term.pop("species_key", "")
            term.pop("species_label", "")

        record = {
            "_op_type": "index",
            "_index": index_name,
            "_type": "term",
            "_id": term["key"],
            "_source": copy.deepcopy(term),
        }

        yield record


def lowercase_term_id(term_key: str) -> str:
    """Lowercase the term value (not the namespace prefix)

    Args:
        term_id (str): term identifier with namespace prefix, e.g. MESH:Atherosclerosis

    Returns:
        str: lowercased, e.g. MESH:atherosclerosis
    """

    (ns, val) = term_key.split(":", 1)
    term_key = f"{ns}:{val.lower()}"

    return term_key


def clear_resource_metadata_cache():
    """Clear the namespace metadata cache

    Called when we update the namespace metadata
    """

    namespace_metadata_cache.clear()
    bel_resource_metadata_cache.clear()


# TODO - refactor get_namespace_metadata and get_bel_resource_metadata into one function


@cachetools.cached(namespace_metadata_cache)
def get_namespace_metadata():
    """Get namespace metadata"""

    namespaces = {}
    for namespace in resources_metadata_coll:
        if namespace.get("resource_type", None) != "namespace":
            continue

        if namespace["source_url"] == "":
            namespace["source_url"] = None

        namespace = Namespace(**namespace)
        namespaces[namespace.namespace] = namespace

    return namespaces


@cachetools.cached(bel_resource_metadata_cache)
def get_bel_resource_metadata():
    """Get BEL resource metadata"""

    resources = {}
    for resource in resources_metadata_coll:

        if resource["source_url"] == "":
            resource["source_url"] = None

        resources[resource["_key"]] = resource

    return resources


def delete_namespace(namespace):
    """Remove namespace resources

    Remove Arangodb terms and equivalences and remove Elasticsearch terms index
    """

    remove_old_db_entries(namespace, force=True)

    es.indices.delete(index=f"{settings.TERMS_INDEX}_{namespace.lower()}_*", ignore=[400, 404])