CIMAC-CIDC/cidc-schemas

View on GitHub
docs/utils.py

Summary

Maintainability
C
1 day
Test Coverage
import os
import jsonschema
from typing import Any, Dict, Iterable, List, Set, Tuple

from cidc_schemas.constants import SCHEMA_DIR
from cidc_schemas.json_validation import _load_dont_validate_schema


SCHEMA_STORE: Dict[Tuple[str, bool], dict] = dict()


def add_merge_pointer_to_data_store(
    root: dict, merge_pointer: str, data_store: dict
) -> Set[str]:
    """
    Updates data by nested-setting the endpoint of the pointer with the part of the schema it points to
    The definition's "required" is a boolean based on the last step

    Parameters
    ----------
    root: dict
        a jsonschemas definition
    merge_pointer: str
        the merge_pointer to fish out of root with descend_dict()
    data_store: dict
        the nested dict to put the referenced definition
        set via nested_set() which adds in place

    Returns
    -------
    required: Set[str] = set()
        the list of property names that are required
    """
    required: Set[str] = set()
    descriptions: Dict[Tuple[str], str] = dict()

    # break up the merge pointer into a set of keys
    # remove any array parts -- we'll always keep descending
    levels: List[str] = [
        part
        for part in merge_pointer.split("/")
        if not part.isdigit() and part not in ("-", "")
    ]
    # want to add back {"items": True} to anything that's a array
    ptr: int = 0
    array_pointers: List[List[str]] = []
    for part in merge_pointer.split("/"):
        if levels[ptr] == part:
            # there wasn't an array here we dropped
            ptr += 1
            # bail at the end, it'll inherit {"type": "array"}
            if ptr == len(levels):
                break
        else:
            # note the processed pointer up until this point
            array_pointers.append(levels[:ptr])

    root, new_required, new_descriptions = descend_dict(root, levels)
    required.update(new_required)
    descriptions.update(new_descriptions)

    # if merge_pointer points to a new item in the list
    # make sure we're all the way down and have a description
    if merge_pointer.endswith("-"):
        # updates in place
        load_subschema_from_url(root)
        required.update(root.get("required", []))

        if "properties" in root:
            root = root["properties"]
            required.update(root.get("required", []))

    # update in place instead of returning
    nested_set(data_store, levels, root)

    # for every intermediate array found before add {"items": True}
    # so the template knows this is an array and not an object
    for pointer_to_array in array_pointers:
        # unneeded at the top level since docs are for a singular upload
        if len(pointer_to_array):
            nested_set(data_store, pointer_to_array + ["items"], True)

    # for all intermediate descriptions add them in
    for pointer_to_descr, descr in descriptions.items():
        nested_set(
            data_store, list(pointer_to_descr) + ["description"], descr, overwrite=False
        )

    return required


def descend_dict(root: dict, levels: List[str]) -> dict:
    """
    Follows `levels` down through `root`
        handles "items", "properties", and "url"s

    Parameters
    ----------
    root: dict
        the nested dict schema to traverse
    levels: List[str]
        a series of dict keys

    Returns
    -------
    dict
        the final definition
    required: Set[str] = set()
        a concentated list of "required" across all layers
    descriptions: Dict[Tuple[str], str] = dict()
        a mapping from json keys to the matching description if exists
    """

    def _all_the_way_down(root: dict) -> bool:
        return (
            "items" not in root
            and "properties" not in root
            and (
                # see _handle_url()
                "url" not in root
                or "type" in root
                or "artifacts" in root["url"]
            )
        )

    required: Set[str] = set()
    descriptions: Dict[Tuple[str], str] = dict()

    def _step_into(dic: dict, key: str) -> dict:
        ret = dic[key]
        required.update(root.get("required", []))
        if "description" in root and tuple(levels[: n + 1]) not in descriptions:
            descriptions[tuple(levels[: n + 1])] = root["description"]
        return ret

    # traverse the schema using the keys
    for n, level in enumerate(levels):
        # keep going while we can
        root = _step_into(root, level)
        # descend into any items, properties, or non-artifact urls
        # single carve out for cytof source_fcs
        while not _all_the_way_down(root) and level != "source_fcs":
            if "items" in root:
                root = _step_into(root, "items")
            if "properties" in root and root["properties"]:
                root = _step_into(root, "properties")
            # updates in place
            load_subschema_from_url(root)
            required.update(root.get("required", []))
            if "items" in root:
                root = _step_into(root, "items")
            if "properties" in root:
                root = _step_into(root, "properties")

    return root, required, descriptions


def flatten_allOf(schema: dict) -> dict:
    """
    Combines `properties` and `required` inplace across all `allOf` if they exist

    Parameters
    ----------
    schema: dict
        a schema definition that may contain jsonschemas allOf

    Returns
    -------
    schema: dict
        input after update
    """
    if not "properties" in schema:
        schema["properties"] = {}
    if not "required" in schema:
        schema["required"] = []

    # use while in case of allOf > allOf
    while "allOf" in schema:
        for other_schema in schema.pop("allOf"):
            if "properties" in other_schema:
                schema["properties"].update(other_schema["properties"])
            if "required" in other_schema:
                schema["required"].extend(other_schema["required"])
            if "allOf" in other_schema:
                schema["allOf"] = other_schema["allOf"]

    return schema


def get_translated_merge_pointers(context: str, definition: dict) -> Set[str]:
    """
    Get the set of translated merge pointers from a preamble_rows or data_columns definition
    Also handles process_as, and so can return many merge pointers

    Parameters
    ----------
    context: str
        absolute pointer to the location of `definition`
        from which merge_pointers are considered relative
    definition: dict
        a preamble row or data column definition
        can contain merge_pointer and/or process_as

    Returns
    -------
    translated_merge_pointers: Set[str]
        translated absolute merge_pointers from `definition`
    """
    translated_merge_pointers = set()
    if "merge_pointer" in definition:
        translated_merge_pointers.add(translate_merge_pointer(context, definition))
    if "process_as" in definition:
        for process in definition["process_as"]:
            translated_merge_pointers.add(translate_merge_pointer(context, process))

    return translated_merge_pointers


def load_schema(root: str, path: str, as_html: bool = True) -> dict:
    """
    Loads the schema from the given `path` in `root`

    Parameters
    ----------
    root: str
        the folder which contains the schema
    path: str
        the schema to load
    as_html: bool = True
        whether or to convert urls to .html instead of .json
    """
    schema_path = os.path.join(root, path)
    if (schema_path, as_html) in SCHEMA_STORE:
        return SCHEMA_STORE[(schema_path, as_html)]

    # if not converting, just return it straight away
    if not as_html:
        # when loading, always in reference to base dir
        ret = flatten_allOf(_load_dont_validate_schema(schema_path, SCHEMA_DIR))
        SCHEMA_STORE[(schema_path, as_html)] = ret
        return ret

    # otherwise we need to make some url changes
    def _json_to_html(ref: str) -> dict:
        """Update refs to refer to the URL of the corresponding documentation."""
        url = ref.replace(".json", ".html")
        url = url.replace("properties/", "")
        url = url.replace("definitions/", "")
        url = url.replace("/", ".")
        with resolver.resolving(ref) as resolved:
            description = resolved.get("description", "")

        return {"url": url, "description": description}

    # when loading or resolving, always in reference to base dir
    full_json = _load_dont_validate_schema(schema_path, SCHEMA_DIR)
    resolver = jsonschema.RefResolver(f"file://{SCHEMA_DIR}/schemas", full_json)

    # when loading, always in reference to base dir
    ret = flatten_allOf(
        _load_dont_validate_schema(schema_path, SCHEMA_DIR, on_refs=_json_to_html)
    )
    SCHEMA_STORE[(schema_path, as_html)] = ret
    return ret


def load_schemas_in_directory(
    schema_dir: str = SCHEMA_DIR,
    recursive: bool = True,
) -> Dict[str, Dict[str, dict]]:
    """
    Load all JSON schemas into a dictionary keyed on the
    schema directory. Values are dictionaries mapping entity
    names to loaded and validated entity schemas.
    If recursive, goes through all subdirectories as well
    Does not provide as_html, therefore defaults to True
    """
    schemas = {}
    for root, _, paths in os.walk(schema_dir):
        root_schemas = {}
        for path in paths:
            if not path.endswith(".json"):
                continue

            schema_name = path[:-5].replace("/", ".")
            root_schemas[schema_name] = load_schema(root, path)

        if len(root_schemas):
            relative_root = root.replace(schema_dir, "").replace("/", ".")
            relative_root = relative_root.replace(".", "", 1)
            schemas[relative_root] = root_schemas

        if not recursive:
            break

    return schemas


def load_subschema_from_url(definition: dict) -> dict:
    """
    Handles urls in loading the subschema and default descriptions
    Any non-artifact urls are replaced with their corresponding definition
    Does NOT translate artifact urls, as they should be linked

    Parameters
    ----------
    definition: dict
        a jsonschemas definition that may contain "url"

    Returns
    -------
    definition: dict
        input after update
    """
    # handle any level urls
    while (
        "type" not in definition
        and "url" in definition
        and "artifacts" not in definition["url"]
    ):
        schema_path = (
            definition["url"].replace(".", "/").replace("/html", ".json").split("#")[0]
        )
        merge_pointer = (
            definition["url"].split("#")[-1] if definition["url"].count("#") else ""
        )
        schema = load_schema(SCHEMA_DIR, schema_path)

        # save the highest description to use
        description: str = definition.get("description", "")
        # definitions first because properties can point here
        if merge_pointer in schema.get("definitions", {}):
            definition.update(schema["definitions"][merge_pointer])
        elif merge_pointer in schema.get("properties", {}):
            definition.update(schema["properties"][merge_pointer])
        elif merge_pointer == "":
            definition.update(schema)

        # include any lower ones too
        # eg ihc > antibody > antibody
        if "properties" in definition:
            definition["properties"] = {
                k: load_subschema_from_url(v) if isinstance(v, dict) else v
                for k, v in definition["properties"].items()
            }

        if description:
            definition["description"] = description

    # if not elif as url can be an array itself
    if definition.get("type") == "array":
        if "description" not in definition:
            definition["description"] = definition["items"].get("description", "")
        definition["items"] = load_subschema_from_url(definition["items"])

    return definition


def nested_set(
    dic: dict,
    keys: Iterable[str],
    value: Any,
    *,
    overwrite: bool = True,
) -> None:
    """
    Sets a value deep in a dict given a set of keys

    Parameters
    ----------
    dict: dict
        the root dict in which to set a value
    keys: Iterable[str]
        a set of keys representing nested dict levels
    value: Any
        the value to set the bottommost entry to
    *
    overwrite: bool = True
        whether or not to overwrite any existing value
    """
    for key in keys[:-1]:
        dic = dic.setdefault(key, {})

    if overwrite or keys[-1] not in dic:
        dic[keys[-1]] = value


def translate_merge_pointer(context_pointer: str, definition: dict) -> str:
    """
    Get the merge_pointer from the definition and combine it with the context
    Handles going up levels if (relative) merge_pointer in definition[0] is nonzero digit

    Parameters
    ----------
    context_pointer: str
        absolute pointer to the location of `definition`
        from which merge_pointers are considered relative
    definition: dict
        a preamble row or data column definition
        must contain merge_pointer

    Returns
    -------
    translated_merge_pointer: str
        the final combined absolute merge_pointer
        will not start with '/'
    """
    context_pointer: str = context_pointer.rstrip("-").lstrip("0/").replace("#", "")
    merge_pointer: str = definition["merge_pointer"]
    if merge_pointer[0].isdigit() and int(merge_pointer[0]):
        context_pointer = "/".join(
            context_pointer.split("/")[: -int(merge_pointer[0]) - 1]
        )
        merge_pointer = merge_pointer[1:]
    return (context_pointer + merge_pointer.lstrip("0")).lstrip("/")