bel/core/utils.py

Summary

Maintainability
A
35 mins
Test Coverage
"""Various utilities used throughout the BEL package"""

# Standard Library
import asyncio
import collections
import datetime
import functools
import json
import re
import tempfile
from functools import partial, wraps
from timeit import default_timer
from typing import Any, List, Mapping, Optional, Tuple

# Third Party
import dateutil
import httpx
import ulid
from cityhash import CityHash64
from loguru import logger

# Namespace Argument Regex pattern
# first section looks for NS:ID!LABEL in function - (([\w\.]+)\s*:\s*(".*?"|\w+)\s*!?\s*(".*?"|\w+)?)[\,\)]
# second section looks for a bare NS:ID!LABEL - (([\w\.]+)\s*:\s*(".*?"|\w+)\s*!?\s*(".*?"|\w+)?)
# order is important
# nsarg_pattern = re.compile(
#     r'((?P<ns>[\w\.]+)\s*:?\s*(?P<id>".*?"|\w+)\s*!?\s*(?P<label>".*?"|\w+)?)[\,\)]|((?P<ns2>[\w\.]+)\s*:\s*(?P<id2>".*?"|\w+)\s*!?\s*(?P<label2>".*?"|\w+)?)'
# )

nsarg_pattern = re.compile(
    r"""
    (?P<ns_arg>
        (?P<ns>[\w\.]+)        # namespace
        \s*:\s*                # ns:id separator
        (?P<id>".*?"|[^\,\(\)\s\!]+)      # id
        (\s*!\s*)?             # id!label separator
        (?P<label>".*?"|[^\,\(\)\s\!]+)?  # optional label
    )
    [\,\)]?                    # stop match

""",
    re.VERBOSE,
)

# Quotes pattern
escaped_quotes_pattern = re.compile(r'\\(")')
quotes_pattern = re.compile(r'(")')


def get_http_client():
    """Client for http requests"""

    return httpx.Client()


http_client = get_http_client()


def namespace_quoting(string: str) -> str:
    """Normalize NSArg ID and Label

    If needs quotes (only if it contains whitespace, comma or ')' ), make sure
    it is quoted, else remove quotes

    Also escape any internal double quotes
    """

    # Remove quotes if exist
    match = re.match(r'\s*"(.*)"\s*$', string)
    if match:
        string = match.group(1)

    string = string.strip()  # remove external whitespace

    string = string.replace('"', '"')  # quote internal double quotes

    # quote only if it contains whitespace, comma, ! or ')'
    if re.search(r"[),\!\s]", string):
        return f'"{string}"'

    return string


def split_key_label(key_label: str) -> dict:
    """Split key label into components ns:id!label"""

    match = nsarg_pattern.match(key_label)

    namespace, id_, label = "", "", ""

    if not match:
        return (namespace, id_, label)

    namespace = (match.group("ns"),)
    id_ = (match.group("id"),)
    if match.group("label"):
        label = (match.group("label"),)

    if isinstance(namespace, tuple):
        namespace = namespace[0]
    if isinstance(id_, tuple):
        id_ = id_[0]
    if isinstance(label, tuple):
        label = label[0]

    return (namespace, id_, label)


def timespan(start_time):
    """Return time in milliseconds from start_time"""

    timespan = datetime.datetime.now() - start_time
    timespan_ms = timespan.total_seconds() * 1000
    return timespan_ms


def download_file(url):
    """Download file"""

    with http_client.stream("GET", url) as response:

        fp = tempfile.NamedTemporaryFile()
        for chunk in response.iter_bytes():
            if chunk:  # filter out keep-alive new chunks
                fp.write(chunk)

        # logger.info(f'Download file - tmp file: {fp.name}  size: {fp.tell()}')
        return fp


def url_path_param_quoting(param):
    """Quote URL path parameters

    Convert '/' to _FORWARDSLASH_ - otherwise is interpreted as additional path parameter
        gunicorn processes the path prior to Falcon and interprets the
        correct quoting of %2F into a slash
    """
    return param.replace("/", "_FORWARDSLASH_")


def _create_hash_from_doc(doc: Mapping[str, Any]) -> str:
    """Create hash Id from edge record

    Args:
        edge (Mapping[str, Any]): edge record to create hash from

    Returns:
        str: Murmur3 128 bit hash
    """

    doc_string = json.dumps(doc, sort_keys=True)
    return _create_hash(doc_string)


def _create_hash(string: str) -> str:
    """Create CityHash64 bit hash of string

    Args:
        string (str): string to create CityHash64 from

    Returns:
        str: CityHash64
    """

    return str(CityHash64(string))


def _generate_id() -> str:
    """Create ULID

    See: https://github.com/ahawker/ulid

    Returns:
        str: ULID random, unique identifier
    """

    return ulid.new()


def dt_utc_formatted():
    """Create UTC ISODate formatted datetime string

    Format: YYYY-MM-DDThh:mm:ss.sssZ
    """
    return f"{datetime.datetime.utcnow().isoformat(timespec='milliseconds')}Z"


def parse_dt(dt: str):
    """Get datetime object from datetime strings"""

    return dateutil.parse(dt)


def asyncify(func):
    """Decorator to wrap sync functions to be awaitable"""

    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)

    return run


def html_wrap_span(
    string: str, pairs: List[Tuple[int, int]], css_class: Optional[str] = "accentuate"
) -> str:
    """Wrap targeted area of Assertion with html highlighting

    to visualize where the error or warning is targeted

    Args:
        string: string to insert html span - wrapping the accentuated content
        pairs: list of tuples of start/end locations in the string to wrap
        css_class: optional class to insert into the span html tag - defaults to 'accentuate'

    Returns:
        string with html spans around accentuated text
    """

    start_html_span = f'<span class="{css_class}">'
    end_html_span = "</span>"
    last_right_section = ""

    result_str = ""
    for idx, pair in enumerate(pairs):
        (left, right) = pair

        if idx == 0:
            result_str += string[0:left]
        else:
            result_str += last_right_section

        result_str += start_html_span
        result_str += string[left:right]
        result_str += end_html_span

        if idx < len(pairs) - 1:
            next_left = pairs[idx + 1][0]
            right_section = string[right:next_left]
            last_right_section = right_section
        else:
            right_section = string[right:]

        result_str += right_section

    return result_str