justaddcoffee/kg-emerging-viruses

View on GitHub
kg_covid_19/transform_utils/ttd/ttd.py

Summary

Maintainability
B
4 hrs
Test Coverage
F
46%
"""Transform TTD data."""

import collections
import logging
import os
import re
from typing import List, Optional, Union

from kg_covid_19.transform_utils.transform import Transform
from kg_covid_19.utils import normalize_curies, write_node_edge_item
from kg_covid_19.utils.transform_utils import (ItemInDictNotFoundError,
                                               get_item_by_priority,
                                               uniprot_make_name_to_id_mapping)

"""Ingest TTD - Therapeutic Targets Database
drug targets, and associated data for each (drugs, ids, etc)

Dataset location:
http://db.idrblab.net/ttd/sites/default/files/ttd_database/P1-01-TTD_target_download.txt
GitHub Issue:
https://github.com/Knowledge-Graph-Hub/kg-covid-19/issues/6
"""


class TTDNotEnoughFieldsError(Exception):
    """Exception raised when number of fields is less than expected."""

    pass


class TTDTransform(Transform):
    """Transforms TTD data."""

    def __init__(
        self, input_dir: Optional[str] = None, output_dir: Optional[str] = None
    ):
        """Initialize."""
        source_name = "ttd"
        super().__init__(source_name, input_dir, output_dir)

    def run(self, data_file: Optional[str] = None):
        """Run the transformation."""
        ttd_file_name = os.path.join(
            self.input_base_dir, "P1-01-TTD_target_download.txt"
        )
        ttd_data = self.parse_ttd_file(ttd_file_name)
        gene_node_type = "biolink:Protein"
        drug_id_prefix = "ttd.drug:"
        drug_node_type = "biolink:Drug"
        drug_gene_edge_label = "biolink:interacts_with"
        drug_gene_edge_relation = "RO:0002436"  # molecularly interacts with
        uniprot_curie_prefix = "UniProtKB:"
        self.node_header = ["id", "name", "category", "TTD_ID", "provided_by"]
        self.edge_header = [
            "subject",
            "predicate",
            "object",
            "relation",
            "provided_by",
            "type",
            "target_type",
        ]

        # make name to id map for uniprot names of human proteins
        dat_gz_id_file = os.path.join(
            self.input_base_dir, "HUMAN_9606_idmapping.dat.gz"
        )
        name_2_id_map = uniprot_make_name_to_id_mapping(dat_gz_id_file)

        # transform data, something like:
        with open(self.output_node_file, "w") as node, open(
            self.output_edge_file, "w"
        ) as edge:

            # write headers (change default node/edge headers if necessary
            node.write("\t".join(self.node_header) + "\n")
            edge.write("\t".join(self.edge_header) + "\n")

            # Set up ID mapping for normalization
            # this gets converted to a flat dict in the end
            all_ttd_drugs = []
            for _, data in ttd_data.items():
                if "UNIPROID" not in data:
                    continue
                if "DRUGINFO" not in data:
                    continue
                for this_drug in data["DRUGINFO"]:
                    this_drug_curie = drug_id_prefix + this_drug[0]
                    all_ttd_drugs.append(
                        {"orig_id": this_drug_curie, "id": this_drug_curie}
                    )
            normalized_ttd_drugs = normalize_curies(
                map_path="./maps/drugcentral-maps-kg_covid_19-0.1.sssom.tsv",
                entries=all_ttd_drugs,
            )
            ttd_drug_map = {
                entry["orig_id"]: entry["id"] for entry in normalized_ttd_drugs
            }

            for target_id, data in ttd_data.items():
                # WRITE NODES

                # skip items that don't refer to UNIPRO gene targets or don't have
                # drug info
                if "UNIPROID" not in data:
                    continue
                if "DRUGINFO" not in data:
                    continue

                #
                # make node for gene(s)
                #
                uniproids: list = self.get_uniproids(
                    data, name_2_id_map, uniprot_curie_prefix
                )
                gene_name = self.get_gene_name(data)

                # gene - ['id', 'name', 'category', 'ttd id for this target']
                for this_id in uniproids:
                    write_node_edge_item(
                        fh=node,
                        header=self.node_header,
                        data=[
                            this_id,
                            gene_name,
                            gene_node_type,
                            target_id,
                            self.source_name,
                        ],
                    )

                # for each drug in DRUGINFO:
                for this_drug in data["DRUGINFO"]:
                    this_drug_curie = ttd_drug_map[drug_id_prefix + this_drug[0]]
                    #
                    # make node for drug
                    #
                    write_node_edge_item(
                        fh=node,
                        header=self.node_header,
                        data=[
                            this_drug_curie,
                            this_drug[1],
                            drug_node_type,
                            this_drug[0],
                            self.source_name,
                        ],
                    )

                    #
                    # make edges for target gene ids <-> drug
                    #
                    targ_type = self.get_targ_type(data)

                    # ['subject', 'edge_label', 'object', 'relation', 'comment']
                    for this_id in uniproids:
                        write_node_edge_item(
                            fh=edge,
                            header=self.edge_header,
                            data=[
                                this_drug_curie,
                                drug_gene_edge_label,
                                this_id,
                                drug_gene_edge_relation,
                                self.source_name,
                                "biolink:Association",
                                targ_type,
                            ],
                        )

    def get_uniproids(
        self, data: dict, name_2_id_map: dict, uniprot_curie_prefix: str
    ) -> List[str]:
        """Get a list of UniProtKB IDs from names."""
        ids = []
        try:
            uniproid_struct = get_item_by_priority(data, ["UNIPROID"])
            uniprot_names = uniproid_struct[0]

            for this_name in uniprot_names:
                # use uniprotkb accession if we can find it
                if this_name in name_2_id_map:
                    ids.append(uniprot_curie_prefix + name_2_id_map[this_name])
        except ItemInDictNotFoundError:
            logging.warning("Problem with UNIPROID for this target id {}".format(data))
        return ids

    def get_gene_name(self, data: dict) -> str:
        """Get the name of a gene."""
        gene_name = ""
        try:
            gene_names = get_item_by_priority(data, ["GENENAME"])
            gene_name = gene_names[0]
        except ItemInDictNotFoundError:
            logging.warning("Problem with UNIPROID for this target id  {}".format(data))
        return gene_name

    def get_targ_type(self, data: dict) -> str:
        """Get the target type for a target."""
        targ_type = ""
        try:
            targ_types = get_item_by_priority(data, ["TARGTYPE"])
            targ_type = targ_types[0]
        except ItemInDictNotFoundError:
            pass
        return targ_type

    def parse_ttd_file(self, file: str) -> dict:
        """Parse entire TTD download file.

        (a few megs, not very mem efficient, but
        should be okay), and return a dict of dicts of lists
        [target_id] -> [abbreviation] -> [list with data]
        where 'abbreviation' is one of:
        ['TARGETID', 'FORMERID', 'UNIPROID', 'TARGNAME', 'GENENAME', 'TARGTYPE',
         'SYNONYMS', 'FUNCTION', 'PDBSTRUC', 'BIOCLASS', 'ECNUMBER', 'SEQUENCE',
         'DRUGINFO', 'KEGGPATH', 'WIKIPATH', 'WHIZPATH', 'REACPATH', 'NET_PATH',
         'INTEPATH', 'PANTPATH', 'BIOCPATH']
        :param file
        :return: dict of dicts of lists
        """
        parsed_data = collections.defaultdict(dict)  # type: ignore

        # wish they'd make this file easier to parse
        seen_dashed_lines = 0
        dashed_line_re = re.compile(r"^-+\n")
        blank_line_re = re.compile(r"^\s*$")

        with open(file, "r") as fh:
            for line in fh:
                if dashed_line_re.match(line):
                    seen_dashed_lines = seen_dashed_lines + 1
                    continue

                if seen_dashed_lines < 2 or blank_line_re.match(line):
                    continue

                (target_id, abbrev, data_list) = self.parse_line(line)

                if target_id not in parsed_data:
                    parsed_data[target_id] = dict()

                if abbrev not in parsed_data[target_id]:
                    parsed_data[target_id][abbrev] = []

                parsed_data[target_id][abbrev].append(data_list)

        return parsed_data

    def parse_line(self, line: str, id_sep="; ") -> list:
        r"""Parse one line of data from P1-01-TTD_target_download.

        Return list of:
        [target_id, abbrev, data_list]
        where:
        target_id is the target_id
        abbrev is a member of 'TARGETID', 'FORMERID', etc] (see above)
        data_list is a list of all items in field3 ... last field, split on '\t'
        :param line: line from P1-01-TTD_target_download
        :return: [target_id, abbrev, data_list]
        :param id_sep: character string that separates ID strings, as in ID1; ID2 ["; "]
        """
        fields = line.rstrip().split("\t")
        if len(fields) < 3:
            raise TTDNotEnoughFieldsError("Not enough fields in line {}".format(line))
        target_id = fields[0]
        abbrev = fields[1]

        data: Union[List, str]
        if len(fields[2:]) == 1:
            if fields[1] == "UNIPROID" and bool(re.search("; ", fields[2])):
                data = fields[2].split(id_sep)
            else:
                data = fields[2]
        else:
            data = fields[2:]

        return [target_id, abbrev, data]