CIMAC-CIDC/cidc-schemas

View on GitHub
cidc_schemas/util.py

Summary

Maintainability
A
2 hrs
Test Coverage
A
98%
import os
import re
import jinja2
from typing import List, Union

JSON = Union[dict, float, int, list, str]

from deepdiff import grep


def load_pipeline_config_template(name: str) -> jinja2.Template:
    curdir = os.path.dirname(os.path.abspath(__file__))
    loader = jinja2.FileSystemLoader(curdir)
    return jinja2.Environment(loader=loader).get_template(
        f"pipeline_configs/{name}.yaml.j2"
    )


def participant_id_from_cimac(cimac_id: str) -> str:
    assert len(cimac_id) == len("CTTTPPPSS.00")
    return cimac_id[:7]


def get_all_paths(ct: dict, key: str, dont_throw=False) -> List[str]:
    """
    find all paths to the given key in the dictionary

    Args:
        ct: clinical_trial object to be modified
        key: the identifier we are looking for in the dictionary

    Throws:
        KeyError if *key* is not found within *ct*

    Returns:
        arg1: string describing the location of the key
        in python-ish access style: "root['access']['path'][0]['something']"
    """

    # first look for key as is
    ds1 = ct | grep(key, match_string=True, case_sensitive=True)
    count1 = 0
    if "matched_values" in ds1:
        count1 = len(ds1["matched_values"])

    # the hack fails if both work... probably need to deal with this
    if count1 == 0:
        if dont_throw:
            return []
        else:
            raise KeyError(f"key: {key} not found")

    return ds1["matched_values"]


def get_path(ct: dict, key: str) -> str:

    all_paths = get_all_paths(ct, key)

    return all_paths.pop()


def get_file_ext(fname):
    return (fname.rsplit(".")[-1]).lower()


def split_python_style_path(path: str) -> list:
    """
    Will parse `get_path` output (`root['some']['access']['path']`)
    to a list of typed (int/str) tokens

    >>> list(split_python_style_path("root['some']['access']['path']"))
    ['some', 'access', 'path']

    >>> list(split_python_style_path("root['with'][0]['integers'][1]['too']"))
    ['with', 0, 'integers', 1, 'too']

    """

    # strip "root[]"
    assert path.startswith("root[")
    path = path[4:]
    # tokenize
    for groups in re.findall(r"\[(([0-9]*)|'([^\]]+)')\]", path):
        yield groups[2] or int(groups[1])


def get_source(ct: dict, key: str, skip_last=None) -> (JSON, JSON):
    """
    extract the object in the dictionary specified by
    the supplied key (or one of its parents.)

    Args:
        ct: clinical_trial object to be searched
        key: the identifier we are looking for in the dictionary,
        skip_last: how many levels at the end of key path we want to skip.
    Returns:
        arg1: the value present in `ct` at the `key` path
        arg2: extra metadata collected while descending down `key` path
    Raises:
        ValueError if `key` doesn't exist in `ct`
    """

    tokens = list(split_python_style_path(key))

    if skip_last:
        last_idx = -1 * skip_last
        last_token = tokens[last_idx]
        tokens = tokens[0:last_idx]
    else:
        last_token = tokens[-1]

    extra_metadata = {}

    def _update_extra_metadata(token, level, namespace):
        if not isinstance(level, dict):
            return namespace
        # We collect every primitive value present on `level`
        # with keys that aren't the `token` key we are looking for.
        # If we are on the last token, we're at the artifact-specific
        # metadata level, so we collect all values, not just primitives.
        is_artifact_specific = token == last_token
        for k, v in level.items():
            if k != token:
                if is_artifact_specific or isinstance(v, (int, float, str)):
                    prop_name = f"{namespace}.{k}" if namespace else k
                    extra_metadata[prop_name] = v
        return f"{namespace}.{token}" if namespace else token

    # Iteratively follow the key path down into the dictionary
    cur_obj = ct
    namespace = ""
    for token in tokens:
        namespace = _update_extra_metadata(token, cur_obj, namespace)
        try:
            cur_obj = cur_obj[token]
        except Exception:
            raise ValueError(f"{token} not found in {namespace}")

    # Extract extra_metadata from the last level if it was skipped
    if skip_last:
        _update_extra_metadata(last_token, cur_obj, namespace)

    return cur_obj, extra_metadata