bel/nanopub/pubmed.py

Summary

Maintainability
C
7 hrs
Test Coverage
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Pubmed related utilities

Given PMID - collect Pubmed data and Pubtator Bioconcepts used for the BELMgr
or enhancing BEL Nanopubs
"""

# Standard Library
import asyncio
import copy
import datetime
import re
from typing import Any, Mapping, MutableMapping

# Third Party
import cachetools
import httpx
from loguru import logger
from lxml import etree

# Local
import bel.core.settings as settings
import bel.terms.terms
from bel.core.utils import http_client, url_path_param_quoting

# Replace PMID
PUBMED_TMPL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=pubmed&retmode=xml&id="

# https://www.ncbi.nlm.nih.gov/research/pubtator-api/publications/export/biocjson?pmids=28483577,28483578,28483579

PUBTATOR_URL = (
    "https://www.ncbi.nlm.nih.gov/research/pubtator-api/publications/export/biocjson?pmids="
)

pubtator_ns_convert = {
    "CHEBI": "CHEBI",
    "Species": "TAX",
    "Gene": "EG",
    "Chemical": "MESH",
    "Disease": "MESH",
}

pubtator_entity_convert = {"Chemical": "Abundance", "Gene": "Gene", "Disease": "Pathology"}
pubtator_annotation_convert = {"Disease": "Pathology"}
pubtator_known_types = [key for key in pubtator_ns_convert.keys()]


def node_text(node):
    """Needed for things like abstracts which have internal tags (see PMID:27822475)"""

    if node.text:
        result = node.text
    else:
        result = ""
    for child in node:
        if child.tail is not None:
            result += child.tail
    return result


@cachetools.cached(cachetools.TTLCache(maxsize=200, ttl=3600))
def get_pubtator_url(pmid):
    """Get pubtator content from url"""

    pubtator = None

    url = f"{PUBTATOR_URL}{pmid}"

    r = http_client.get(url, timeout=10)

    if r and r.status_code == 200:
        pubtator = r.json()

    else:
        logger.error(f"Cannot access Pubtator, status: {r.status_code} url: {url}")

    return pubtator


def pubtator_convert_to_key(annotation: dict) -> str:
    """Convert pubtator annotation info to key (NS:ID)"""

    ns = pubtator_ns_convert.get(annotation["infons"]["type"], None)
    id_ = annotation["infons"]["identifier"]
    id_ = id_.replace("MESH:", "")

    if ns is None:
        logger.warning("")
    return f"{ns}:{id_}"


def get_pubtator(pmid):
    """Get Pubtator Bioconcepts from Pubmed Abstract

    Re-configure the denotations into an annotation dictionary format
    and collapse duplicate terms so that their spans are in a list.
    """

    annotations = []

    pubtator = get_pubtator_url(pmid)
    if pubtator is None:
        return annotations

    known_types = ["CHEBI", "Chemical", "Disease", "Gene", "Species"]

    for passage in pubtator["passages"]:
        for annotation in passage["annotations"]:
            if annotation["infons"]["type"] not in known_types:
                continue

            key = pubtator_convert_to_key(annotation)

            annotations.append(
                {
                    "key": key,
                    "text": annotation["text"],
                    "locations": copy.copy(annotation["locations"]),
                }
            )

    return annotations


def process_pub_date(year, mon, day, medline_date):
    """Create pub_date from what Pubmed provides in Journal PubDate entry"""

    if medline_date:
        year = "0000"
        match = re.search(r"\d{4,4}", medline_date)
        if match:
            year = match.group(0)

        if year and re.match("[a-zA-Z]+", mon):
            pub_date = datetime.datetime.strptime(f"{year}-{mon}-{day}", "%Y-%b-%d").strftime(
                "%Y-%m-%d"
            )
        elif year:
            pub_date = f"{year}-{mon}-{day}"

    else:
        pub_date = None
        if year and re.match("[a-zA-Z]+", mon):
            pub_date = datetime.datetime.strptime(f"{year}-{mon}-{day}", "%Y-%b-%d").strftime(
                "%Y-%m-%d"
            )
        elif year:
            pub_date = f"{year}-{mon}-{day}"

    return pub_date


def parse_book_record(doc: dict, root) -> dict:
    """Parse Pubmed Book entry"""

    doc["title"] = next(iter(root.xpath("//BookTitle/text()")))

    doc["authors"] = []
    for author in root.xpath("//Author"):
        last_name = next(iter(author.xpath("LastName/text()")), "")
        first_name = next(iter(author.xpath("ForeName/text()")), "")
        initials = next(iter(author.xpath("Initials/text()")), "")
        if not first_name and initials:
            first_name = initials
        doc["authors"].append(f"{last_name}, {first_name}")

    pub_year = next(iter(root.xpath("//Book/PubDate/Year/text()")), None)
    pub_mon = next(iter(root.xpath("//Book/PubDate/Month/text()")), "Jan")
    pub_day = next(iter(root.xpath("//Book/PubDate/Day/text()")), "01")
    medline_date = next(iter(root.xpath("//Journal/JournalIssue/PubDate/MedlineDate/text()")), None)

    pub_date = process_pub_date(pub_year, pub_mon, pub_day, medline_date)

    doc["pub_date"] = pub_date

    for abstracttext in root.xpath("//Abstract/AbstractText"):
        abstext = node_text(abstracttext)

        label = abstracttext.get("Label", None)
        if label:
            doc["abstract"] += f"{label}: {abstext}\n"
        else:
            doc["abstract"] += f"{abstext}\n"

    doc["abstract"] = doc["abstract"].rstrip()

    return doc


def parse_journal_article_record(doc: dict, root) -> dict:
    """Parse Pubmed Journal Article record"""

    doc["title"] = next(iter(root.xpath("//ArticleTitle/text()")), "")

    # TODO https://stackoverflow.com/questions/4770191/lxml-etree-element-text-doesnt-return-the-entire-text-from-an-element
    atext = next(iter(root.xpath("//Abstract/AbstractText/text()")), "")

    for abstracttext in root.xpath("//Abstract/AbstractText"):
        abstext = node_text(abstracttext)

        label = abstracttext.get("Label", None)
        if label:
            doc["abstract"] += f"{label}: {abstext}\n"
        else:
            doc["abstract"] += f"{abstext}\n"

    doc["abstract"] = doc["abstract"].rstrip()

    doc["authors"] = []
    for author in root.xpath("//Author"):
        last_name = next(iter(author.xpath("LastName/text()")), "")
        first_name = next(iter(author.xpath("ForeName/text()")), "")
        initials = next(iter(author.xpath("Initials/text()")), "")
        if not first_name and initials:
            first_name = initials
        doc["authors"].append(f"{last_name}, {first_name}")

    pub_year = next(iter(root.xpath("//Journal/JournalIssue/PubDate/Year/text()")), None)
    pub_mon = next(iter(root.xpath("//Journal/JournalIssue/PubDate/Month/text()")), "Jan")
    pub_day = next(iter(root.xpath("//Journal/JournalIssue/PubDate/Day/text()")), "01")
    medline_date = next(iter(root.xpath("//Journal/JournalIssue/PubDate/MedlineDate/text()")), None)

    pub_date = process_pub_date(pub_year, pub_mon, pub_day, medline_date)

    doc["pub_date"] = pub_date
    doc["journal_title"] = next(iter(root.xpath("//Journal/Title/text()")), "")
    doc["joural_iso_title"] = next(iter(root.xpath("//Journal/ISOAbbreviation/text()")), "")
    doc["doi"] = next(iter(root.xpath('//ArticleId[@IdType="doi"]/text()')), None)

    doc["compounds"] = []
    for chem in root.xpath("//ChemicalList/Chemical/NameOfSubstance"):
        chem_id = chem.get("UI")
        doc["compounds"].append({"key": f"MESH:{chem_id}", "label": chem.text})

    compounds = [cmpd["key"] for cmpd in doc["compounds"]]
    doc["mesh"] = []
    for mesh in root.xpath("//MeshHeading/DescriptorName"):
        mesh_id = f"MESH:{mesh.get('UI')}"
        if mesh_id in compounds:
            continue
        doc["mesh"].append({"key": mesh_id, "label": mesh.text})

    return doc


@cachetools.cached(cachetools.TTLCache(maxsize=200, ttl=3600))
def get_pubmed_url(pmid):
    """Get pubmed url"""

    root = None

    try:
        pubmed_url = f"{PUBMED_TMPL}{str(pmid)}"

        r = http_client.get(pubmed_url)

        logger.info(f"Status {r.status_code}  URL: {pubmed_url}")

        if r.status_code == 200:
            content = r.content
            root = etree.fromstring(content)
        else:
            logger.warning(f"Could not download pubmed url: {pubmed_url}")

    except Exception as e:
        logger.warning(
            f"Bad Pubmed request, error: {str(e)}",
            url=f'{PUBMED_TMPL.replace("PMID", pmid)}',
        )

    return root


def get_pubmed(pmid: str) -> Mapping[str, Any]:
    """Get pubmed xml for pmid and convert to JSON

    Remove MESH terms if they are duplicated in the compound term set

    ArticleDate vs PubDate gets complicated: https://www.nlm.nih.gov/bsd/licensee/elements_descriptions.html see <ArticleDate> and <PubDate>
    Only getting pub_year at this point from the <PubDate> element.

    Args:
        pmid: pubmed id number as a string

    Returns:
        pubmed json
    """

    doc = {
        "abstract": "",
        "pmid": pmid,
        "title": "",
        "authors": [],
        "pub_date": "",
        "journal_iso_title": "",
        "journal_title": "",
        "doi": "",
        "compounds": [],
        "mesh": [],
    }

    root = get_pubmed_url(pmid)

    if root is None:
        return None

    try:
        doc["pmid"] = root.xpath("//PMID/text()")[0]
    except Exception as e:
        return None

    if doc["pmid"] != pmid:
        logger.error(f"Requested PMID {doc['pmid']}doesn't match record PMID {pmid}")

    if root.find("PubmedArticle") is not None:
        doc = parse_journal_article_record(doc, root)
    elif root.find("PubmedBookArticle") is not None:
        doc = parse_book_record(doc, root)

    return doc


async def async_get_normalized_terms_for_annotations(term_keys):
    """Async collection of normalized terms for annotations"""

    normalized = asyncio.gather(
        *[bel.terms.terms.async_get_normalized_terms(term_key) for term_key in term_keys]
    )

    return normalized


def get_normalized_terms_for_annotations(term_keys):

    return [bel.terms.terms.get_normalized_terms(term_key) for term_key in term_keys]


def add_annotations(pubmed):
    """Add nanopub annotations to pubmed doc

    Enhance MESH terms etc as full-fledged nanopub annotations for use by the BEL Nanopub editor
    """

    term_keys = (
        [entry["key"] for entry in pubmed.get("compounds", [])]
        + [entry["key"] for entry in pubmed.get("mesh", [])]
        + [entry["key"] for entry in pubmed.get("pubtator", [])]
    )
    term_keys = list(set(term_keys))

    terms = {}

    for entry in pubmed.get("pubtator", []):
        terms[entry["key"]] = {"key": entry["key"], "label": entry["text"]}

    for entry in pubmed.get("compounds", []):
        terms[entry["key"]] = {"key": entry["key"], "label": entry["label"]}

    for entry in pubmed.get("mesh", []):
        terms[entry["key"]] = {"key": entry["key"], "label": entry["label"]}

    # loop = asyncio.get_event_loop()
    # normalized = loop.run_until_complete(async_get_normalized_terms_for_annotations(term_keys))

    normalized = get_normalized_terms_for_annotations(terms.keys())

    normalized = sorted(normalized, key=lambda x: x["annotation_types"], reverse=True)

    pubmed["annotations"] = []

    for annotation in normalized:

        # HACK - only show first annotation type
        if len(annotation["annotation_types"]) > 0:
            annotation_type = annotation["annotation_types"][0]
        else:
            annotation_type = ""

        if annotation.get("label", False):
            terms[annotation["original"]]["key"] = annotation["decanonical"]
            terms[annotation["original"]]["label"] = annotation["label"]
            terms[annotation["original"]]["annotation_types"] = [annotation_type]

    pubmed["annotations"] = copy.deepcopy(
        sorted(terms.values(), key=lambda x: x.get("annotation_types", []), reverse=True)
    )

    # Add missing
    for idx, annotation in enumerate(pubmed["annotations"]):
        if annotation["label"] == "":
            pubmed["annotations"][idx]["label"] = annotation["key"]

    return pubmed


def get_pubmed_for_beleditor(pmid: str, pubmed_only: bool = False) -> Mapping[str, Any]:
    """Get fully annotated pubmed doc with Pubtator and full entity/annotation_types

    Args:
        pmid: Pubmed PMID

    Returns:
        Mapping[str, Any]: pubmed dictionary
    """

    pubmed = get_pubmed(pmid)

    if pubmed is None:
        return pubmed

    if not pubmed_only:
        pubmed["pubtator"] = get_pubtator(pmid)

    # Add entity types and annotation types to annotations
    pubmed = add_annotations(pubmed)

    return pubmed


def main():

    pmid = "19894120"

    pubmed = get_pubmed_for_beleditor(pmid)


if __name__ == "__main__":
    main()