bel/resources/ortholog.py

Summary

Maintainability
A
2 hrs
Test Coverage
# Standard Library
import copy
import gzip
import json
from collections import defaultdict
from typing import IO, Mapping, Optional

# Third Party
from arango import ArangoError
from loguru import logger

# Local
import bel.core.settings as settings
import bel.core.utils
import bel.db.arangodb as arangodb
from bel.db.arangodb import (
    ortholog_edges_name,
    ortholog_nodes_name,
    resources_db,
    resources_metadata_coll,
)


def remove_old_db_entries(source, version: str = "", force: bool = False):
    """Remove older ortholog data entries"""

    if force or version == "":
        filter_version = ""
    else:
        filter_version = f'FILTER doc.version != "{version}"'

    # Clean up old entries
    remove_old_ortholog_edges = f"""
        FOR doc in {ortholog_edges_name}
            FILTER doc.source == "{source}"
            {filter_version}
            REMOVE doc IN {ortholog_edges_name}
    """
    remove_old_ortholog_nodes = f"""
        FOR doc in {ortholog_nodes_name}
            FILTER doc.source == "{source}"
            {filter_version}
            REMOVE doc IN {ortholog_nodes_name}
    """
    arangodb.aql_query(resources_db, remove_old_ortholog_edges)
    arangodb.aql_query(resources_db, remove_old_ortholog_nodes)


def load_orthologs(
    fo: IO, metadata: dict, force: bool = False, resource_download_url: Optional[str] = None
):
    """Load orthologs into ArangoDB

    Args:
        fo: file obj - orthologs file
        metadata: dict containing the metadata for orthologs
    """

    result = {"state": "Succeeded", "messages": []}

    statistics = {"entities_count": 0, "orthologous_pairs": defaultdict(lambda: defaultdict(int))}

    version = metadata["version"]
    source = metadata["name"]

    metadata_key = metadata["name"]
    prior_metadata = resources_metadata_coll.get(metadata_key)

    try:
        prior_version = prior_metadata.get("version", "")
        prior_entity_count = prior_metadata["statistics"].get("entities_count", 0)

    except Exception:
        prior_entity_count = 0
        prior_version = ""

    if force or prior_version != version:
        arangodb.batch_load_docs(
            resources_db, orthologs_iterator(fo, version, statistics), on_duplicate="update"
        )
    else:
        msg = f"NOTE: This orthology dataset {source} at version {version} is already loaded and the 'force' option was not used"
        result["messages"].append(msg)
        return result

    logger.info(
        f"Loaded orthologs, source: {source}  count: {statistics['entities_count']}", source=source
    )

    if prior_entity_count > statistics["entities_count"]:
        logger.error(
            f"Error: This orthology dataset {source} at version {version} has fewer orthologs than previously loaded orthology dataset. Skipped removing old ortholog entries"
        )

        result["state"] = "Failed"
        msg = f"Error: This orthology dataset {source} at version {version} has fewer orthologs than previously loaded orthology dataset. Skipped removing old ortholog entries"
        result["messages"].append(msg)
        return result

    remove_old_db_entries(source, version=version)

    # Add metadata to resource metadata collection
    metadata["_key"] = arangodb.arango_id_to_key(source)

    # Using side effect to get statistics from orthologs_iterator on purpose
    metadata["statistics"] = copy.deepcopy(statistics)

    if resource_download_url is not None:
        metadata["resource_download_url"] = resource_download_url

    resources_metadata_coll.insert(metadata, overwrite=True)

    result["messages"].append(f'Loaded {statistics["entities_count"]} ortholog sets into arangodb')
    return result


def orthologs_iterator(fo, version, statistics: Mapping):
    """Ortholog node and edge iterator

    NOTE: the statistics dict works as a side effect since it is passed as a reference!!!
    """

    species_list = settings.BEL_FILTER_SPECIES

    fo.seek(0)

    for line in fo:
        edge = json.loads(line)
        if "metadata" in edge:
            source = edge["metadata"]["name"]
            continue

        if "ortholog" in edge:
            edge = edge["ortholog"]

            subject_key = edge["subject_key"]
            subject_species_key = edge["subject_species_key"]
            object_key = edge["object_key"]
            object_species_key = edge["object_species_key"]

            # Skip if any values are missing
            if any(
                [
                    not val
                    for val in [subject_key, subject_species_key, object_key, object_species_key]
                ]
            ):
                continue

            # Skip if species_key not listed in species_list
            if species_list and (
                subject_species_key not in species_list or object_species_key not in species_list
            ):
                continue

            # Simple lexical sorting (e.g. not numerical) to ensure 1 entry per pair
            if subject_key > object_key:
                subject_key, subject_species_key, object_key, object_species_key = (
                    object_key,
                    object_species_key,
                    subject_key,
                    subject_species_key,
                )

            # Convert to ArangoDB legal chars for arangodb _key
            subject_db_key = arangodb.arango_id_to_key(subject_key)
            object_db_key = arangodb.arango_id_to_key(object_key)

            # Subject node
            yield (
                ortholog_nodes_name,
                {
                    "_key": subject_db_key,
                    "key": subject_key,
                    "species_key": subject_species_key,
                    "source": source,
                    "version": version,
                },
            )
            # Object node
            yield (
                ortholog_nodes_name,
                {
                    "_key": object_db_key,
                    "key": object_key,
                    "species_key": object_species_key,
                    "source": source,
                    "version": version,
                },
            )

            arango_edge = {
                "_from": f"{ortholog_nodes_name}/{subject_db_key}",
                "_to": f"{ortholog_nodes_name}/{object_db_key}",
                "_key": bel.core.utils._create_hash(f"{subject_key}>>{object_key}"),
                "type": "ortholog_to",
                "source": source,
                "version": version,
            }

            statistics["entities_count"] += 1
            statistics["orthologous_pairs"][subject_species_key][object_species_key] += 1
            statistics["orthologous_pairs"][object_species_key][subject_species_key] += 1

            yield (arangodb.ortholog_edges_name, arango_edge)


def delete_ortholog_resource(source):
    """Remove ortholog resource

    Remove Arangodb terms and equivalences and remove Elasticsearch terms index
    """

    remove_old_db_entries(source, force=True)