justaddcoffee/kg-emerging-viruses

View on GitHub
kg_covid_19/utils/transform_utils.py

Summary

Maintainability
A
25 mins
Test Coverage
D
67%
"""Utilities for assisting data transformations."""
import gzip
import logging
import os
import re
import shutil
import zipfile
from typing import Any, Dict, List, Union

from tqdm import tqdm  # type: ignore


class TransformError(Exception):
    """Base class for other exceptions."""

    pass


class ItemInDictNotFoundError(TransformError):
    """Raised when the input value is too small."""

    pass


# TODO: option to further refine typing of method arguments below.


def multi_page_table_to_list(multi_page_table: Any) -> List[Dict]:
    """
    Convert multi-page tables to lists of dicts.

    Method to turn table data returned from tabula.io.read_pdf(),
    possibly broken over several pages, into a list
    of dicts, one dict for each row.
    Args:
        multi_page_table:
    Returns:
        table_data: A list of dicts, where each dict is item from one row.
    """
    # iterate through data for each of 3 pages
    table_data: List[Dict] = []

    header_items = get_header_items(multi_page_table[0])

    for this_page in multi_page_table:
        for row in this_page["data"]:
            if len(row) != 4:
                logging.warning("Unexpected number of rows in {}".format(row))

            items = [d["text"] for d in row]
            this_dict = dict(zip(header_items, items))
            table_data.append(this_dict)

    return table_data


def get_header_items(table_data: Any) -> List:
    """Get header from (first page of) a table.

    Args:
        table_data: Data, as list of dicts from tabula.io.read_pdf().
    Returns:
        header_items: An array of header items.
    """
    header = table_data["data"].pop(0)
    header_items = [d["text"] for d in header]

    return header_items


def write_node_edge_item(fh: Any, header: List, data: List, sep: str = "\t"):
    r"""
    Write out a single line for a node or an edge in *.tsv.

    :param fh: file handle of node or edge file
    :param header: list of header items
    :param data: data for line to write out
    :param sep: separator [\t]
    """
    if len(header) != len(data):
        raise Exception("Header and data are not the same length.")
    try:
        fh.write(sep.join(data) + "\n")
    except IOError:
        logging.warning("Can't write data for {}".format(data))


def get_item_by_priority(items_dict: dict, keys_by_priority: list) -> str:
    """
    Retrieve item from a dict using a list of keys.

    Keys should be in descending order of priority.
    :param items_dict:
    :param keys_by_priority: list of keys to use to find values
    :return: str: first value in dict for first item in keys_by_priority
    that isn't blank, or None
    """
    value = None
    for key in keys_by_priority:
        if key in items_dict and items_dict[key] != "":
            value = items_dict[key]
            break
    if value is None:
        raise ItemInDictNotFoundError(
            "Can't find item in items_dict {}".format(items_dict)
        )
    return value


def data_to_dict(these_keys, these_values) -> dict:
    """Zip up two lists to make a dict.

    :param these_keys: keys for new dict
    :param these_values: values for new dict
    :return: dictionary
    """
    return dict(zip(these_keys, these_values))


def uniprot_make_name_to_id_mapping(dat_gz_file: str) -> dict:
    """
    Convert UniProtKB id maps to dict of maps.

    Given a Uniprot dat.gz file, like this:
    ftp://ftp.uniprot.org/pub/databases/uniprot/
    current_release/knowledgebase/idmapping/by_organism/
    HUMAN_9606_idmapping.dat.gz
    makes dict with name to id mapping
    :param dat_gz_file:
    :return: dict with mapping
    """ ""
    name_to_id_map = dict()
    logging.info("Making uniprot name to id map")
    with gzip.open(dat_gz_file, mode="rb") as file:
        for line in tqdm(file):
            items = line.decode().strip().split("\t")
            name_to_id_map[items[2]] = items[0]
    return name_to_id_map


def uniprot_name_to_id(name_to_id_map: dict, name: str) -> Union[str, None]:
    """Set up Uniprot name to ID mapping.

    :param name_to_id_map: mapping dict[name] -> id
    :param name: name
    :return: id string, or None
    """
    if name in name_to_id_map:
        return name_to_id_map[name]
    else:
        return None


def parse_header(header_string: str, sep: str = "\t") -> List:
    """Parse header data from a file.

    Args:
        header_string: A string containing header items.
        sep: A string containing a delimiter.

    Returns:
        A list of header items.
    """
    header = header_string.strip().split(sep)
    return [i.replace('"', "") for i in header]


def unzip_to_tempdir(zip_file_name: str, tempdir: str) -> None:
    """Decompress a zip file into a temp directory."""
    with zipfile.ZipFile(zip_file_name, "r") as z:
        z.extractall(tempdir)


def ungzip_to_tempdir(gzipped_file: str, tempdir: str) -> str:
    """Decompress a GZIP file into a temp directory."""
    ungzipped_file = os.path.join(tempdir, os.path.basename(gzipped_file))
    if ungzipped_file.endswith(".gz"):
        ungzipped_file = os.path.splitext(ungzipped_file)[0]

    with gzip.open(gzipped_file, "rb") as f_in, open(ungzipped_file, "wb") as f_out:
        shutil.copyfileobj(f_in, f_out)
    return ungzipped_file


def guess_bl_category(identifier: str) -> str:
    """Guess Biolink category for a given identifier.

    Note: This is a temporary solution
    and should not be used long term.
    Args:
        identifier: A CURIE
    Returns:
        The category for the given CURIE
    """
    prefix = identifier.split(":")[0]
    if prefix in {"UniProtKB", "ComplexPortal"}:
        category = "biolink:Protein"
    elif prefix in {"GO"}:
        category = "biolink:OntologyClass"
    else:
        category = "biolink:NamedThing"
    return category


def collapse_uniprot_curie(uniprot_curie: str) -> str:
    """
    Collapse a UniProtKB isoform ID to a parent ID.

    Given a UniProtKB curie for an
    isoform such as UniprotKB:P63151-1
    or UniprotKB:P63151-2, collapse to parent protein
    (UniprotKB:P63151 / UniprotKB:P63151)
    :param uniprot_curie:
    :return: collapsed UniProtKB ID
    """
    if re.match(r"^uniprotkb:", uniprot_curie, re.IGNORECASE):
        uniprot_curie = re.sub(r"\-\d+$", "", uniprot_curie)
    return uniprot_curie