CIMAC-CIDC/cidc-schemas

View on GitHub
cidc_schemas/template.py

Summary

Maintainability
F
5 days
Test Coverage
A
94%
# -*- coding: utf-8 -*-

"""The underlying data representation of an assay or shipping manifest template."""

import os
import os.path
import logging
import uuid
import json
import jsonschema
import re
from typing import (
    Any,
    BinaryIO,
    Callable,
    Dict,
    List,
    NamedTuple,
    Optional,
    Tuple,
    Union,
)
from collections import defaultdict

from .constants import ANALYSIS_TEMPLATE_DIR, SCHEMA_DIR, TEMPLATE_DIR
from .json_validation import _load_dont_validate_schema
from .util import get_file_ext

from cidc_ngs_pipeline_api import OUTPUT_APIS

logger = logging.getLogger("cidc_schemas.template")


POSSIBLE_FILE_EXTS = [
    "bed",
    "narrowPeak",
    "narrowpeak",  # to remove from merge_pointer
    "bw",
    "tsv",
    "log",
    "summary",
    "txt",
    "gz",
    "bam",
    "bai",
    "tn",
    "vcf",
    "yaml",
    "csv",
    "zip",
    "json",
    "dedup",
    "stat",
    "sf",
    "maf",
    "tar",
    "junction",
    "pdf",
    "png",
    "cns",
    "cncf",
]


def generate_analysis_template_schemas(
    target_dir: str = ANALYSIS_TEMPLATE_DIR,
    fname_format: Callable[[str], str] = lambda file: f"{file}_analysis_template.json",
):
    """Uses output_API.json's from cidc-ngs-pipeline-api along with existing assays/components/ngs analysis templates to generate templates/analyses schemas"""
    # for each output_API.json
    for analysis, output_schema in OUTPUT_APIS.items():
        # try to convert it, but skip if it's not implemented
        # need an existing assay/components/ngs analysis schema to find merge pointers
        try:
            assay_schema = _load_dont_validate_schema(
                f"assays/components/ngs/{analysis}/{analysis}_analysis.json"
                if analysis in ["rna", "atacseq"]  # special cases currently
                else f"assays/{analysis}_analysis.json"  # all others should be here
            )
        except Exception as e:
            print(
                f"skipping {analysis}: failed to load corresponding `assays/components/ngs/{analysis}/{analysis}_analysis.json`"
            )
        else:
            template = _convert_api_to_template(analysis, output_schema, assay_schema)
            with open(os.path.join(target_dir, fname_format(analysis)), "w") as f:
                json.dump(template, f, indent=4)


def _first_in_context(path: list, context: dict):
    """
    For matching a file path to its nearest equivalent key in a schema context
    Recursive to try for more specific, more complex entries

    Params
    ------
    path : list
        a list of path elements to locate in @context
    context : dict
        a schema that has an element similar to @path

    Returns
    -------
    tuple
        (equivalent key in context, rest of path elements, context[key]['properties'])
        ("", [], context) if path doesn't lead to anything in the given context
    """
    ret = ("", [], {})  # index-safe equivalent of None
    if not isinstance(path, list):
        path = list(path)
    if not path:
        return ret

    # if we can step down
    if path[0] in context:
        # if this is the end, we're done
        if len(path) == 1:
            context = context[path[0]]
            if "items" in context:
                # to handle arrays
                context = context["items"]
            if "properties" in context:
                context = context["properties"]

            ret = (path[0], [], context)  # path[1:] == []
        else:
            # otherwise, see if there's something more specific first
            trial = [path[0] + "_" + path[1]] + path[2:]
            ret = _first_in_context(trial, context)
            if not ret[0]:
                context = context[path[0]]
                if "items" in context:
                    # to handle arrays
                    context = context["items"]
                if "properties" in context:
                    context = context["properties"]
                ret = (path[0], path[1:], context)

    # sometimes capitalisation changes
    if not ret[0] and path[0].lower() != path[0]:
        if sum([path[0].lower() == k.lower() for k in context.keys()]) == 1:
            # guaranteed to work and be the only item
            key = [k for k in context.keys() if k.lower() == path[0].lower()][0]

            context = context[key]
            if "items" in context:
                # to handle arrays
                context = context["items"]
            if "properties" in context:
                context = context["properties"]
            ret = [key, path[1:], context]

    # sometimes `.` are replaced by `_`
    if not ret[0] and "." in path[0]:
        trial = [path[0].replace(".", "_")] + path[1:]
        ret = _first_in_context(trial, context)
        if not ret[0]:
            trial = path[0].split(".") + path[2:]
            ret = _first_in_context(trial, context)

    # sometimes `summary` is pulled up from the end
    if not ret[0] and "summary" in path[-1] and "summary" not in path[0]:
        trial = [path[0] + "_summary"] + path[1:]
        ret = _first_in_context(trial, context)

    # sometimes two are actually stuck together
    if not ret[0] and len(path) > 1:
        trial = ["_".join(path[:2])] + path[2:]
        ret = _first_in_context(trial, context)

    # sometime `logs` is skipped
    if not ret[0] and path[0] == "logs" and len(path) > 1:
        trial = path[1:]
        ret = _first_in_context(trial, context)

    # sometimes the key has added `_`
    if not ret[0] and path[0] in [k.replace("_", "") for k in context.keys()]:
        path[0] = [k for k in context.keys() if k.replace("_", "") == path[0]][0]
        ret = _first_in_context(path, context)

    # sometimes the key is missing `_`
    if not ret[0] and "_" in path[0]:
        trial = [path[0].replace("_", "")] + path[1:]
        ret = _first_in_context(trial, context)

    # sometimes bam isn't included in `bam.bai` -> `index`
    if not ret[0] and "bam_index" in path[-1]:
        trial = path[:-1] + [path[-1].replace("bam_index", "index")]
        ret = _first_in_context(trial, context)

    # if there isn't, we're still done
    if not ret[0]:
        ret = ("", path, context)  # [] if len(path) == 1

    return ret if ret[0] else ("", path, context)


_excluded_samples_worksheet_snippet = {
    "Excluded Samples": {
        "prism_data_object_pointer": "/excluded_samples/-",
        "data_columns": {
            "Samples Excluded From Analysis": {
                "cimac id": {
                    "type_ref": "sample.json#properties/cimac_id",
                    "merge_pointer": "0/cimac_id",
                },
                "reason": {
                    "type_ref": "assays/components/excluded_samples.json#items/properties/reason_excluded",
                    "merge_pointer": "0/reason_excluded",
                },
            }
        },
    },
}


def _initialize_template_schema(name: str, title: str, pointer: str):
    obj_root_schema = (
        f"assays/components/ngs/{name}/{name}_analysis.json"
        if name in ["rna", "atacseq"]  # special cases currently
        else f"assays/{name}_analysis.json"  # all others should be here
    )

    expanded_name = {
        "atacseq": f"Assay by Transposase-Accessible Chromatin by Sequencing (ATACseq)",
        "rna": "RNA sequencing Level 1",
        "wes": "Whole Exome Sequencing (WES) Tumor-Normal Paired",
        "wes_tumor_only": "Whole Exome Sequencing (WES) Tumor-Only",
    }[name]

    # static
    template = {
        "title": f"{title} analysis template",
        "description": f"{expanded_name} analysis submission.",
        "prism_template_root_object_schema": obj_root_schema,
        "prism_template_root_object_pointer": f"/analysis/{name}_analysis"
        + ("/0" if name == "atacseq" else ""),
        "properties": {
            "worksheets": {
                f"{title} Analysis": {
                    "preamble_rows": {
                        "protocol identifier": {
                            "merge_pointer": f"{3 if name == 'atacseq' else 2}/protocol_identifier",
                            "type_ref": "clinical_trial.json#properties/protocol_identifier",
                        },
                        "folder": {
                            "do_not_merge": True,
                            "type": "string",
                            "allow_empty": True,
                        },
                    },
                    "prism_data_object_pointer": f"/{pointer}/-",
                    "data_columns": {f"{title} Runs": {}},
                },
                **_excluded_samples_worksheet_snippet,
            }
        },
    }

    # assay specific additions
    if name == "atacseq":
        template["properties"]["worksheets"][f"{title} Analysis"]["preamble_rows"][
            "batch id"
        ] = {
            "merge_pointer": "0/batch_id",
            "type_ref": "assays/atacseq_assay.json#properties/batch_id",
        }

    return template


def _calc_merge_pointer(file_path: str, context: dict, key: str):
    """Return the merge_pointer in context that the file_path directs to

    Params
    ------
    file_path : str
        the local file path for the file
    context : dict
        the assay schema for the analysis onto which to map the file path
    key : str
        the key from output_API.json undre which the file path is listed
        used for WES handling of `all_summaries`
        e.g. 'tumor cimac id', 'normal cimac id'
    """
    # remove any fill-in-the-blanks
    while "{" in file_path:
        temp = file_path.split("{", 1)
        file_path = temp[0] + temp[1].split("}", 1)[1].strip("._")
    # this can generate `/{id}/` -> `//` and `-{id}-` -> `--`, so fix those
    file_path = file_path.replace("//", "/").replace("--", "_").lower()

    # specialty conversions for existing non-standard usage
    # all text should be lowercase
    fixes = {  # old : new
        # WES V3
        "output.twist.neoantigen": "filter.neoantigen",
        "optitype/result": "hla/optitype_result",
        "tcellextrect/": "tcell/",
        "hlahd/result/": "hla/hla",
        "xhla/": "hla/xhla",
        "clonality/segments": "copynumber/segments",
        "clonality/genome": "copynumber/genome",
        "clonality/chromosome": "copynumber/chromosome",
        "clonality/sequenza": "copynumber/sequenza",
        "clonality/facets": "copynumber/facets",
        "clonality/bin50": "copynumber/sequenza_final",
        "consensus_merged": "consensus",
        "clonality/alternative": "purity/alternative",
        "clonality/cp": "purity/cp",
        "clonality/pyclone6": "clonality/",
        "clonality/results.summary": "clonality/summary",
        "cnvkit/call": "copynumber/cnv_segments",
        "cnvkit/scatter": "copynumber/cnv_scatterplot",
        "cnvkit/": "copynumber/",
        "purity/cncf": "copynumber/facets",
        "purity/facets": "copynumber/facets",
        "/align/recalibrated": "/alignment/align_recalibrated",
        # Pre WES V3
        ".bam.bai": ".bam.index",
        "clonality/": "clonality/clonality/",
        "copynumber/": "copynumber/copynumber_",
        "tn_corealigned.bam": "tn_corealigned",
        "all_samples_summaries": "all_summaries",
        "/align/sorted.dedup": "/alignment/align_sorted_dedup",
        "sample_summar": "summar",
        "all_epitopes": "epitopes",
        ".txt.tn.tsv": ".tsv",
        "report/somatic_variants/05_": "report/",
        "report/neoantigens/01_hla_r": "neoantigen/HLA_r",
        "msisensor2": "msisensor",
        "msisensor/single/msisensor.txt": "msisensor/msisensor_report.txt",
        "/report.": "/report/report.",
        "wes_meta/02_": "",
        "json/wes.json": "wes_sample.json",
        "vcfcompare": "vcf_compare",
        "trust4/": "trust4/trust4_",
        "addsample_report": "sample_report",
        "chimeric.out.junction": "chimeric_out_junction.junction",
        "haplotyper.rna.vcf": "haplotyper.vcf",
        # atacseq analysis
        "rep1/rep1_": "",  # rep1 are really {replicate} but static
        "treat_pipleup.bw": "bigwig",  # weird name to file type
        "align/sorted.bam": "aligned_sorted_bam",  # simple change
    }
    for old, new in fixes.items():
        file_path = file_path.replace(old, new)

    # split path into pieces
    file_path = file_path.split("/")

    # specialty conversions for file names / extensions only
    if "tnscope" in file_path[-1]:
        if "twist" not in file_path[-1]:
            temp = file_path[-1].split(".")
            if temp[-1] != "gz":
                file_path[-1] = ".".join(temp[-1:] + temp[:-1])
            else:
                file_path[-1] = ".".join(temp[-2:] + temp[:-2])
    if "mosdepth" in file_path[-1]:
        temp = file_path[-1].split(".")
        file_path[-1] = ".".join(temp[1:4]) + "_" + temp[0]

    # remove any extra `analysis` at the front
    if file_path[0] == "analysis":
        file_path = file_path[1:]

    # look into first step
    curr_step, file_path, curr_context = _first_in_context(file_path, context)
    merge_pointer = "0/" + curr_step
    # then off to the races
    while len(curr_step):
        curr_step, file_path, curr_context = _first_in_context(file_path, curr_context)
        if curr_step:
            merge_pointer += "/" + curr_step

    return merge_pointer


def _calc_gcs_uri_path(name: str, merge_pointer: str):
    # generate GCS URI ending from merge pointer
    if "wes" in name:  # WES doesn't get the beginning path for some reason
        file = merge_pointer.lstrip("0").rsplit("/", 1)[1]
    else:
        file = merge_pointer[2:]
    # special handling for .bam.bai
    file = file.replace("_bam_index", "_bam_bai").replace("_index", "_bam_bai")

    # guess at file extension from merge_pointer
    file = file.split("_")

    # trim off file extension from path
    while file[-1] in POSSIBLE_FILE_EXTS:  # go from the right
        file.pop()
    file = "_".join(file)

    # special handling, then return
    file = (
        file.replace("align_", "")
        .replace("cnv_calls", "cnvcalls")
        .replace("_bed.", ".bed")
    )
    return file


def _convert_api_to_template(name: str, schema: dict, assay_schema: dict):
    # import here to avoid circular dependence where prism imports Template
    from .prism.merger import InvalidMergeTargetException

    # so many different ways of writing it
    title = {
        "atacseq": "ATACseq",
        "rna": "RNAseq level 1",
        "wes": "WES",
        "wes_tumor_only": "WES tumor-only",
    }[name]

    pointer = [
        k
        for k, subschema in assay_schema["properties"].items()
        if k != "excluded_samples" and "items" in subschema
    ]
    assert len(pointer) == 1, f"Cannot decide which pointer to use: {pointer}"
    pointer = pointer[0]

    template = _initialize_template_schema(name, title, pointer)

    # for each entry
    subtemplate = {}
    used_merge_pointers, used_gcs_uris = [], []
    for long_key, entries in schema.items():
        if long_key == "batch id":
            # batch-level artifacts are assigned in the preamble
            # add to _initialize_template_schema
            continue

        # so many ways to write this too
        if long_key == "id":
            long_key = "cimac id"  # assume CIMAC if just 'id'

        if "cimac id" in long_key:
            pysafe_key = "id"
        elif long_key == "run id":
            pysafe_key = "run"
        else:
            pysafe_key = long_key.replace(" ", "_")

        if long_key == "normal cimac id":
            short_key = "normal"
        elif long_key == "tumor cimac id" or long_key == "tumor_cimac_id":
            short_key = "tumor"
        else:
            short_key = long_key.replace(" ", "_")

        # static header
        if "cimac id" in long_key:
            type_ref = "sample.json#properties/cimac_id"
            if "wes" in name:
                merge_pointer = f"/{short_key}/cimac_id"
            else:
                merge_pointer = f"/{long_key.replace(' ','_')}"
        elif "wes" in name:
            type_ref = f"assays/{name}_analysis.json#definitions/"
            if long_key == "run id":
                type_ref += f"{'sample' if 'tumor_only' in name else 'pair'}_analysis/properties/"

            if short_key in ["normal", "tumor"]:
                type_ref += f"{short_key}/properties/cimac_id"
            else:
                type_ref += f"{long_key.replace(' ','_')}"
            merge_pointer = f"/{long_key.replace(' ','_')}"

        else:
            type_ref = f"assays/components/ngs/{name}/"

            if name == "rna":
                type_ref += "rna_level1"
            else:
                type_ref += name

            type_ref += f"_analysis.json#properties/{long_key.replace(' ','_')}"
            merge_pointer = f"/{long_key.replace(' ','_')}"

        subtemplate[long_key] = {
            "merge_pointer": merge_pointer,
            "type_ref": type_ref,
            "process_as": [],
        }

        # keep track of where we are in the analysis schema
        context = assay_schema["properties"][pointer]["items"]["properties"]
        if short_key not in context:
            raise InvalidMergeTargetException(
                f"{long_key} in {name} does not have a corresponding entry in `assays/components/ngs/{name}/{name}_analysis.json`"
            )
        elif "properties" in context[short_key]:
            context = context[short_key]["properties"]

        # for each entry in the output_API.json
        for entry in entries:  # entries = list[dict]
            # get the local file path
            file_path = entry["file_path_template"]

            # calculate merge_pointer
            merge_pointer = _calc_merge_pointer(file_path, context, long_key)
            if "wes" in name:
                # WES doesn't get starting 0 for some reason
                merge_pointer = merge_pointer.lstrip("0")
                # also gets normal / tumor prepended
                if short_key in ["normal", "tumor"]:
                    merge_pointer = f"/{short_key}{merge_pointer}"

            if merge_pointer in ["0/", "/"]:  # default value, possibly modified
                raise InvalidMergeTargetException(
                    f"{file_path} cannot be mapped to a location of the data object"
                )
            elif merge_pointer in used_merge_pointers:
                raise InvalidMergeTargetException(
                    f"{file_path} causes a collision for inferred merge target {merge_pointer}"
                )

            # let's check its target too
            merge_target = context.copy()
            for ptr in merge_pointer.lstrip("0/").split("/"):
                if "properties" in merge_target and ptr not in merge_target:
                    merge_target = merge_target["properties"]

                if ptr in merge_target:
                    merge_target = merge_target[ptr]
                elif ptr in ["normal", "tumor"]:
                    # already handled, special add ins
                    # only sometimes needs to be stripped though
                    # if incorrect, will still error in next step
                    continue
                else:
                    print(file_path)
                    raise ValueError("Generated pointer to non-existant merge_target")

            # GCS URI start is static
            gcs_uri = f"{{protocol identifier}}/{name}/"
            if "wes" in name:  # wes has its own scheme
                gcs_uri += "{run id}/analysis/"
                if long_key != "run id":
                    gcs_uri += f"{short_key}/{{{long_key}}}/"
            else:
                gcs_uri += f"{{{long_key}}}/analysis/"

            try:
                gcs_uri += _calc_gcs_uri_path(name, merge_pointer)
            except Exception as e:
                raise e

            # now get actual file extension from file_path_template
            ext = (
                entry["file_path_template"]
                .split("/")[-1]
                .replace("sample_summary", ".summary")
                .split(".")
            )
            # only keep valid parts, in order they appear
            ext = [i for i in ext if i in POSSIBLE_FILE_EXTS]
            ext = ".".join(ext)
            gcs_uri += "." + ext

            if "wes" in name:
                gcs_uri = gcs_uri.replace(
                    ".summary", "_summary"
                )  # doesn't change in WES for some reason

            # I'm not sure if this could actually happen without causing a merge collision above
            # therefore also don't know how to test
            if gcs_uri in used_gcs_uris:
                raise InvalidMergeTargetException(
                    f"{file_path} caused a collision for inferred destination URI {gcs_uri}"
                )

            # append empty file extension if entry['file_path_template'] doesn't have one so that prism doesn't break
            if entry["file_path_template"].split("/")[-1].count(".") == 0:
                entry["file_path_template"] += "."

            # fill in `process_as` entry
            subsubtemplate = {
                "parse_through": f"lambda {pysafe_key}: f'{{folder or \"\"}}{entry['file_path_template'].replace(long_key, pysafe_key)}'",
                "merge_pointer": merge_pointer,
                "gcs_uri_format": gcs_uri,
                "type_ref": "assays/components/local_file.json#properties/file_path",
                "is_artifact": 1,
            }
            if entry.get("optional", False):
                subsubtemplate["allow_empty"] = True

            subtemplate[long_key]["process_as"].append(subsubtemplate)
            used_merge_pointers.append(merge_pointer)
            used_gcs_uris.append(gcs_uri)

        # store it all on the static part
        template["properties"]["worksheets"][f"{title} Analysis"]["data_columns"][
            f"{title} Runs"
        ] = subtemplate

    # add comments to all of them
    if "wes" in name:
        type_ref = f"assays/{name}_analysis.json#definitions/"
        type_ref += "sample" if "tumor_only" in name else "pair"
        type_ref += "_analysis/properties/comments"
    else:
        type_ref = f"assays/components/ngs/{name}/"
        if name == "rna":
            type_ref += "rna_level1_analysis.json#properties/comments"
        else:  # name == "atacseq"
            type_ref += f"{name}_analysis.json#definitions/entry/properties/comments"

    template["properties"]["worksheets"][f"{title} Analysis"]["data_columns"][
        f"{title} Runs"
    ]["comments"] = {
        "type_ref": type_ref,
        "merge_pointer": "0/comments",
        "allow_empty": True,
    }

    return template


def _get_template_path_map() -> dict:
    """Build a mapping from template schema types to template schema paths."""
    path_map = {}
    for template_type_dir in os.listdir(TEMPLATE_DIR):
        abs_type_dir = os.path.join(TEMPLATE_DIR, template_type_dir)
        if os.path.isdir(abs_type_dir):
            for template_file in os.listdir(abs_type_dir):
                template_type = template_file.replace("_template.json", "")
                template_schema_path = os.path.join(abs_type_dir, template_file)
                path_map[template_type] = template_schema_path

    return path_map


_TEMPLATE_PATH_MAP = _get_template_path_map()


def generate_empty_template(schema_path: str, target_path: str):
    """Write the .xlsx template for the given schema to the target path."""
    logger.info(f"Writing empty template for {schema_path} to {target_path}.")
    template = Template.from_json(schema_path)
    template.to_excel(target_path)


def generate_all_templates(target_dir: str):
    """
    Generate empty template .xlsx files for every available template schema and
    write them to the target directory.
    """
    # We expect two directories: one for metadata schemas and one for manifests
    for template_type_dir in os.listdir(TEMPLATE_DIR):
        if not template_type_dir.startswith("."):
            # Create the directory for this template type
            target_subdir = os.path.join(target_dir, template_type_dir)
            if not os.path.exists(target_subdir):
                os.makedirs(target_subdir)

            schema_subdir = os.path.join(TEMPLATE_DIR, template_type_dir)

            # Create a new empty template for each template schema in schema_subdir
            for template_schema_file in os.listdir(schema_subdir):
                if not template_schema_file.startswith("."):
                    schema_path = os.path.join(schema_subdir, template_schema_file)
                    template_xlsx_file = template_schema_file.replace(".json", ".xlsx")
                    target_path = os.path.join(target_subdir, template_xlsx_file)
                    generate_empty_template(schema_path, target_path)


class AtomicChange(NamedTuple):
    """
    Represents exactly one "value set" operation on some data object
    `Pointer` being a json-pointer string showing where to set `value` to.
    """

    pointer: str
    value: Any


class LocalFileUploadEntry(NamedTuple):
    local_path: str
    gs_key: str
    upload_placeholder: str
    metadata_availability: Optional[bool]
    allow_empty: Optional[bool]


class _FieldDef(NamedTuple):
    """
    Represents all the specs on processing a specific value
    """

    key_name: str
    coerce: Callable
    merge_pointer: str
    # MAYBE TODO unify type and type_ref
    type: Union[str, None] = None
    type_ref: str = None
    gcs_uri_format: Union[str, dict, None] = None
    extra_metadata: bool = False
    parse_through: Union[str, None] = None
    do_not_merge: bool = False
    allow_empty: bool = False
    encrypt: bool = False
    is_artifact: Union[str, bool] = False

    def artifact_checks(self):
        # TODO maybe move these checks to constructor?
        # Though it will require changing implementation
        # from NamedTuple to something more extensible like attrs or dataclasses
        if self.is_artifact and not self.gcs_uri_format:
            raise Exception(f"Empty gcs_uri_format")

        if self.gcs_uri_format and not self.is_artifact:
            raise Exception(f"gcs_uri_format defined for not is_artifact")

        if self.gcs_uri_format and not isinstance(self.gcs_uri_format, (dict, str)):
            raise Exception(
                f"Bad gcs_uri_format: {type(self.gcs_uri_format)} - should be dict or str."
            )

        if (
            isinstance(self.gcs_uri_format, dict)
            and "format" not in self.gcs_uri_format
        ):
            raise Exception(f"dict type gcs_uri_format should have 'format' def")

    def process_value(
        self, raw_val, format_context: dict, encrypt_fn: Callable
    ) -> Tuple[List[AtomicChange], List[LocalFileUploadEntry]]:

        logger.debug(f"Processing field spec: {self}")

        # skip nullable
        if self.allow_empty and raw_val is None:
            return [], []

        if self.do_not_merge:
            logger.debug(f"Ignoring {self.key_name!r} due to 'do_not_merge' == True")
            return [], []

        if self.parse_through:
            try:
                raw_val = eval(self.parse_through, format_context, {})(raw_val)

            # catching everything, because of eval
            except Exception as e:
                _field_name = self.merge_pointer.rsplit("/", 1)[-1]
                raise ParsingException(
                    f"Cannot extract {_field_name} from {self.key_name} value: {raw_val!r}\n{e}"
                ) from e

        # or set/update value in-place in data_obj dictionary

        try:
            val, files = self._calc_val_and_files(raw_val, format_context, encrypt_fn)
        except ParsingException:
            raise
        except Exception as e:
            # this shouldn't wrap all exceptions into a parsing one,
            # but we need to split calc_val_and_files to handle them separately here
            # because we still want to catch all from `.coerce`
            raise ParsingException(
                f"Can't parse {self.key_name!r} value {str(raw_val)!r}: {e}"
            ) from e

        if self.is_artifact == True:  # multi goes into other one
            placeholder_pointer = self.merge_pointer + "/upload_placeholder"
            facet_group_pointer = self.merge_pointer + "/facet_group"
            return (
                [
                    AtomicChange(placeholder_pointer, val["upload_placeholder"]),
                    AtomicChange(facet_group_pointer, val["facet_group"]),
                ],
                files,
            )
        else:
            return [AtomicChange(self.merge_pointer, val)], files

    ## TODO easy - split val coerce / files calc to handle exceptions separately
    ## TODO hard - files (artifact and multi) should be just coerce?
    def _calc_val_and_files(self, raw_val, format_context: dict, encrypt_fn: Callable):
        """
        Processes one field value based on `_FieldDef` taken from a ..._template.json schema.
        Calculates a file upload entry if is_artifact.
        """

        val = self.coerce(raw_val)

        if self.encrypt:
            val = encrypt_fn(val)

        if not self.is_artifact:
            return val, []  # no files if it's not an artifact

        files = []

        # deal with multi-artifact
        if self.is_artifact == "multi":
            logger.debug(f"      collecting multi local_file_path {self}")

            # In case of is_aritfact=multi we expect the value to be a comma-separated
            # list of local_file paths (that we will convert to uuids)
            # and also for the corresponding DM schema to be an array of artifacts
            # that we will fill with upload_placeholder uuids

            # So our value is a list of artifact placeholders
            val = []

            # and we iterate through local file paths:
            for num, local_path in enumerate(raw_val.split(",")):
                # Ignoring errors here as we're sure `coerce` will just return a uuid
                file_uuid = self.coerce(local_path)

                artifact, facet_group = self._format_single_artifact(
                    local_path=local_path,
                    uuid=file_uuid,
                    format_context=dict(
                        format_context,
                        num=num  # add num to be able to generate
                        # different gcs keys for each multi-artifact file.
                    ),
                )

                val.append(
                    {"upload_placeholder": file_uuid, "facet_group": facet_group}
                )

                files.append(artifact)

        else:
            logger.debug(f"Collecting local_file_path {self}")
            artifact, facet_group = self._format_single_artifact(
                local_path=raw_val, uuid=val, format_context=format_context
            )

            val = {"upload_placeholder": val, "facet_group": facet_group}

            files.append(artifact)

        return val, files

    def _format_single_artifact(
        self, local_path: str, uuid: str, format_context: dict
    ) -> Tuple[LocalFileUploadEntry, str]:
        """Return a LocalFileUploadEntry for this artifact, along with the artifact's facet group."""

        # By default we think gcs_uri_format is a format-string
        format = self.gcs_uri_format
        try_formatting = lambda: format.format_map(format_context)

        # or it could be a dict
        if isinstance(self.gcs_uri_format, dict):
            if "check_errors" in self.gcs_uri_format:
                # `eval` should be fine, as we're controlling the code argument in templates
                err = eval(self.gcs_uri_format["check_errors"])(local_path)
                if err:
                    raise ParsingException(err)

            format = self.gcs_uri_format["format"]
            # `eval` should be fine, as we're controlling the code argument in templates
            try_formatting = lambda: eval(format)(local_path, format_context)

        try:
            gs_key = try_formatting()
        except Exception as e:
            logger.error(str(e), exc_info=True)
            raise ParsingException(
                f"Can't format destination gcs uri for {self.key_name!r}: {format}"
            )

        # remove [ or ] in GCS URI because gsutils treats brackets as a character set
        # and matches multiple file paths, which is not allowed for a target
        gs_key = gs_key.replace("[", "").replace("]", "")

        # if no . in the GCS URI, don't check file extension as none specified
        if "." in self.gcs_uri_format:
            expected_extension = get_file_ext(gs_key)
            provided_extension = get_file_ext(local_path)
            if provided_extension != expected_extension:
                raise ParsingException(
                    f"Expected {'.' + expected_extension} for {self.key_name!r} but got {'.' + provided_extension!r} instead."
                )

        facet_group = _get_facet_group(format)

        return (
            LocalFileUploadEntry(
                local_path=local_path,
                gs_key=gs_key,
                upload_placeholder=uuid,
                metadata_availability=self.extra_metadata,
                allow_empty=self.allow_empty,
            ),
            facet_group,
        )


class ParsingException(ValueError):
    pass


_empty_defaultdict: Dict[str, str] = defaultdict(str)


def _get_facet_group(gcs_uri_format: str) -> str:
    """ "
    Extract a file's facet group from its GCS URI format string by removing
    the "format" parts.
    """
    # Provide empty strings for a GCS URI formatter variables
    try:
        # First, attempt to call the format string as a lambda
        # eval is safe here because all possible values are defined in schemas
        fmted_string = eval(gcs_uri_format)("", _empty_defaultdict)
    except:
        # Fall back to string interpolation via format_map
        fmted_string = gcs_uri_format.format_map(_empty_defaultdict)

    # Clear any double slashes
    facet_group = re.sub(r"\/\/*", "/", fmted_string)

    return facet_group


class Template:
    """
    Configuration describing a manifest or assay template

    Properties:
        schema {dict} -- a validated template JSON schema
        worksheets {Dict[str, dict]} -- a mapping from worksheet names to worksheet schemas
    """

    ignored_worksheets = ["Legend", "Data Dictionary"]

    def __init__(self, schema: dict, type: str, schema_root: str = SCHEMA_DIR):
        """
        Load the schema defining a manifest or assay template.

        Arguments:
            schema {dict} -- a valid JSON schema describing a template
            type -- schema file ..._template.json prefix. Used for repr.
        """
        self.schema = schema
        self.type = type
        self.schema_root = schema_root
        self.worksheets = self._extract_worksheets()
        self.key_lu = self._load_keylookup()

    def __repr__(self):
        return f"<Template({self.type})>"

    def _extract_worksheets(self) -> Dict[str, dict]:
        """Build a mapping from worksheet names to worksheet section schemas"""

        assert (
            "worksheets" in self.schema["properties"]
        ), f'{self.type} schema missing "worksheets" property'
        worksheet_schemas = self.schema["properties"]["worksheets"]

        worksheets = {}
        for name, schema in worksheet_schemas.items():
            self._validate_worksheet(name, schema)

            worksheets[name] = self._process_worksheet(schema)

        return worksheets

    @staticmethod
    def _process_fieldname(name: str) -> str:
        """Convert field name to lowercase to ease matching"""
        return name.lower()

    @staticmethod
    def _process_worksheet(worksheet: dict) -> dict:
        """Do pre-processing on a worksheet"""

        def process_fields(schema: dict) -> dict:
            processed = {}
            for field_name, field_schema in schema.items():
                field_name_proc = Template._process_fieldname(field_name)
                processed[field_name_proc] = field_schema
            return processed

        # Process field names to ensure we can match on them later
        processed_worksheet = {}
        for section_name, section_schema in worksheet.items():
            if section_name == "preamble_rows":
                processed_worksheet[section_name] = process_fields(section_schema)
            elif section_name == "data_columns":
                data_schemas = {}
                for table_name, table_schema in section_schema.items():
                    data_schemas[table_name] = process_fields(table_schema)
                processed_worksheet[section_name] = data_schemas
            elif section_name == "prism_arbitrary_data_merge_pointer":
                processed_worksheet[
                    "prism_arbitrary_data_merge_pointer"
                ] = section_schema

        return processed_worksheet

    def _get_ref_coerce(self, ref: str):
        """
        This function takes a json-schema style $ref pointer,
        opens the schema and determines the best python
        function to type the value.

        Args:
            ref: /path/to/schema.json

        Returns:
            Python function pointer
        """

        referer = {"$ref": ref}

        resolver_cache = {}
        schemas_dir = f"file://{self.schema_root}/schemas"
        while "$ref" in referer:
            # get the entry
            resolver = jsonschema.RefResolver(schemas_dir, referer, resolver_cache)
            _, referer = resolver.resolve(referer["$ref"])

        entry = referer

        return self._get_typed_entry_coerce(entry)

    @staticmethod
    def _gen_upload_placeholder_uuid(_):
        return str(uuid.uuid4())

    @staticmethod
    def _get_typed_entry_coerce(entry: dict):
        """
        This function takes a json-schema style type
        and determines the best python
        function to type the value.
        """
        # if it's an artifact that will be loaded through local file
        # we just return uuid as value
        if entry.get("$id") in ["local_file_path", "local_file_path_list"]:
            return Template._gen_upload_placeholder_uuid

        if isinstance(entry["type"], list):
            return Template._get_list_type_coerce(entry["type"])
        else:
            return Template._get_simple_type_coerce(entry["type"])

    @staticmethod
    def _get_list_type_coerce(type_list: List[str]):
        if "boolean" in type_list and ("integer" in type_list or "number" in type_list):
            raise ParsingException(
                f"Multiple conflicting coercions found - cannot have boolean alongside integer/numer: {type_list}"
            )

        coerce_fns = {t: Template._get_simple_type_coerce(t) for t in type_list}

        def coerce(val, func_map: Dict[str, Callable]):
            # Types listed in decreasing order of specificity -
            # i.e., always cast as an integer before casting as a number,
            # always cast as a number before casting as a string.
            prioritized_types = ["boolean", "integer", "number", "string"]

            errors = {}
            for t in prioritized_types:
                try:
                    return func_map[t](val)
                except Exception as e:
                    errors[t] = e

            raise ParsingException(f"No valid coercion found: {errors}")

        return lambda v: coerce(v, coerce_fns)

    @staticmethod
    def _get_simple_type_coerce(t: str):
        if t == "string":
            return str
        elif t == "integer":
            return int
        elif t == "number":
            return float
        elif t == "boolean":
            return bool
        else:
            raise NotImplementedError(f"no coercion available for type:{t}")

    def _get_coerce(self, field_def: dict) -> Callable:
        """Checks if we have a cast func for that 'type_ref' and returns it."""

        orig_fdef = dict(field_def)

        if "type_ref" in field_def or "ref" in field_def:
            coerce = self._get_ref_coerce(
                field_def.pop("ref", field_def.pop("type_ref"))
            )
        elif "type" in field_def:
            coerce = self._get_typed_entry_coerce(field_def)

        elif field_def.get("do_not_merge", False):

            raise Exception(
                "Template fields flagged with `do_not_merge` do not have a typecast function"
            )

        else:
            raise Exception(
                f'Either "type" or "type_ref" or "$ref" should be present '
                f"in each template schema field def, but not found in {orig_fdef!r}"
            )

        return coerce

    def _load_field_defs(self, key_name, def_dict) -> List[_FieldDef]:
        """
        Converts a template schema "field definition" to a list of typed `_FieldDef`s,
        which ensures we get only supported matching logic.
        """

        def_dict = dict(def_dict)  # so we don't mutate original
        process_as = def_dict.pop("process_as", [])
        res = []

        if not def_dict.get("do_not_merge"):

            # remove all unsupported _FieldDef keys
            for f in list(def_dict):
                if f in _FieldDef._fields:
                    continue
                def_dict.pop(f, None)

            try:
                coerce = self._get_coerce(def_dict)
                fd = _FieldDef(key_name=key_name, coerce=coerce, **def_dict)
                fd.artifact_checks()
            except Exception as e:
                raise Exception(f"Couldn't load mapping for {key_name!r}: {e}") from e

            res.append(fd)

        # "process_as" allows to define additional places/ways to put a match
        # somewhere in the resulting doc, with additional processing.
        # E.g. we need to strip cimac_id='CM-TEST-0001-01' to 'CM-TEST-0001'
        # and put it in this sample parent's cimac_participant_id
        for extra_fdef in process_as:
            # recursively adds coerce to each sub 'process_as' item
            res.extend(self._load_field_defs(key_name, extra_fdef))

        return res

    def _load_keylookup(self) -> dict:
        """
        The excel spreadsheet uses human friendly (no _) names
        for properties, where the field it refers to in the schema
        has a different name. This function builds a dictionary
        to lookup these.

        It also populates the coercion function for each
        property.

        Returns:
            Dictionary keyed by spreadsheet property names
        """

        # create a key lookup dictionary
        key_lu = defaultdict(dict)

        # loop over each worksheet
        for ws_name, ws_schema in self.worksheets.items():

            ws_name = self._process_fieldname(ws_name)

            try:
                # loop over each row in pre-amble
                for preamble_key, preamble_def in ws_schema.get(
                    "preamble_rows", {}
                ).items():

                    key_lu[ws_name][preamble_key] = self._load_field_defs(
                        preamble_key, preamble_def
                    )

                # load the data columns
                for section_key, section_def in ws_schema.get(
                    "data_columns", {}
                ).items():
                    for column_key, column_def in section_def.items():

                        key_lu[ws_name][column_key] = self._load_field_defs(
                            column_key, column_def
                        )
            except Exception as e:
                raise Exception(
                    f"Error in template {self.type!r}/{ws_name!r}: {e}"
                ) from e

        # converting from defaultdict to just dict (of dicts)
        # to be able to catch unexpected worksheet error
        return dict(key_lu)

    @staticmethod
    def _sanitize_arbitrary_key(key):
        # TODO figure out sanitization - non-ascii or non-unicode or something
        return key

    def _process_arbitrary_val(
        self, key, raw_val, arbitrary_data_merge_pointer
    ) -> AtomicChange:
        """
        Processes one field value based on 'prism_arbitrary_data' directive in template schema.
        Calculates a list of `AtomicChange`s within a context object
        and an empty list of file upload entries.
        """

        return AtomicChange(
            arbitrary_data_merge_pointer + "/" + self._sanitize_arbitrary_key(key),
            raw_val,
        )

    def process_field_value(
        self,
        worksheet: str,
        key: str,
        raw_val,
        format_context: dict,
        encrypt_fn: Callable,
    ) -> Tuple[List[AtomicChange], List[LocalFileUploadEntry]]:
        """
        Processes one field value based on field_def taken from a template schema.
        Calculates a list of `AtomicChange`s within a context object
        and a list of file upload entries.
        A list of values and not just one value might arise from a `process_as` section
        in template schema, that allows for multi-processing of a single cell value.
        """

        logger.debug(f"Processing property {worksheet!r}:{key!r} - {raw_val!r}")
        try:
            ws_field_defs = self.key_lu[self._process_fieldname(worksheet)]
            ws = self.worksheets[worksheet]
        except KeyError:
            raise ParsingException(f"Unexpected worksheet {worksheet!r}.")

        try:
            field_defs = ws_field_defs[self._process_fieldname(key)]
        except KeyError:
            if ws.get("prism_arbitrary_data_merge_pointer"):
                return (
                    [
                        self._process_arbitrary_val(
                            key, raw_val, ws["prism_arbitrary_data_merge_pointer"]
                        )
                    ],
                    [],
                )
            else:
                raise ParsingException(f"Unexpected property {worksheet!r}:{key!r}")

        logger.debug(f"Found field {len(field_defs)} defs")

        changes, files = [], []
        for f_def in field_defs:
            try:
                chs, fs = f_def.process_value(raw_val, format_context, encrypt_fn)
            except Exception as e:
                raise ParsingException(e)

            changes.extend(chs)
            files.extend(fs)

        return changes, files

    # XlTemplateReader only knows how to format these types of sections
    VALID_WS_SECTIONS = set(
        [
            "preamble_rows",
            "data_columns",
            "prism_preamble_object_pointer",
            "prism_data_object_pointer",
            "prism_preamble_object_schema",
            "prism_arbitrary_data_section",
            "prism_arbitrary_data_merge_pointer",
        ]
    )

    @staticmethod
    def _validate_worksheet(ws_title: str, ws_schema: dict):
        # Ensure all worksheet sections are supported
        ws_sections = set(ws_schema.keys())
        unknown_props = ws_sections.difference(Template.VALID_WS_SECTIONS)
        assert (
            not unknown_props
        ), f"unknown worksheet sections {unknown_props} - only {Template.VALID_WS_SECTIONS} supported"

    @staticmethod
    def from_type(template_type: str):
        """Load a Template from a template type, e.g., "pbmc" or "wes"."""
        try:
            schema_path = _TEMPLATE_PATH_MAP[template_type]
        except KeyError:
            raise NotImplementedError(f"unknown template type: {template_type}")
        return Template.from_json(schema_path, type=template_type)

    @staticmethod
    def from_json(
        template_schema_path: str, schema_root: str = SCHEMA_DIR, type: str = ""
    ):
        """
        Load a Template from a template schema.

        Arguments:
            template_schema_path {str} -- path to the template schema file
            schema_root {str} -- path to the directory where all schemas are stored
        """
        template_schema = _load_dont_validate_schema(template_schema_path, schema_root)

        return Template(
            template_schema,
            type=type
            or os.path.basename(template_schema_path).partition("_template.json")[0],
        )

    def to_excel(self, xlsx_path: str, close: bool = True):
        """
        Write this `Template` to an Excel file

        Arguments:
            xlsx_path {str} -- desired output path of the resulting xlsx file
            close {bool} = True -- whether to close the workbook or to return it
        """
        from .template_writer import XlTemplateWriter

        return XlTemplateWriter().write(xlsx_path, self, close=close)

    def validate_excel(self, xlsx: Union[str, BinaryIO]) -> bool:
        """Validate the given Excel file (either a path or an open file) against this `Template`"""
        from .template_reader import XlTemplateReader

        xlsx, errs = XlTemplateReader.from_excel(xlsx)
        if errs:
            return False
        return xlsx.validate(self)

    def iter_errors_excel(self, xlsx: Union[str, BinaryIO]) -> List[str]:
        """Produces all validation errors the given Excel file (either a path or an open file) against this `Template`"""
        from .template_reader import XlTemplateReader

        xlsx, errs = XlTemplateReader.from_excel(xlsx)
        if errs:
            return errs
        return xlsx.iter_errors(self)