CIMAC-CIDC/cidc-schemas

View on GitHub
cidc_schemas/unprism.py

Summary

Maintainability
B
4 hrs
Test Coverage
A
96%
"""Tools from extracting information from trial metadata blobs."""
from io import StringIO, BytesIO
from typing import Callable, Dict, List, NamedTuple, Optional, Union

import pandas as pd

from . import prism
from .util import participant_id_from_cimac


class DeriveFilesContext(NamedTuple):
    trial_metadata: dict
    upload_type: str
    # fetch_artifact:
    #   * should return None if no artifact is found.
    #   * arg1 (str): object_url
    #   * arg2 (bool): if True, return artifact as StringIO, otherwise BytesIO.
    fetch_artifact: Callable[[str, bool], Optional[Union[StringIO, BytesIO]]]
    # TODO: add new attributes as needed?


class Artifact(NamedTuple):
    object_url: str
    data: Union[str, bytes]
    file_type: str
    data_format: str
    metadata: Optional[dict]


class DeriveFilesResult(NamedTuple):
    artifacts: List[Artifact]
    trial_metadata: dict


_upload_type_derivations: Dict[
    str, Callable[[DeriveFilesContext], DeriveFilesResult]
] = {}


def _register_derivation(upload_type: str):
    """Bind an upload type to a function that generates its file derivations."""

    def decorator(f):
        _upload_type_derivations[upload_type] = f
        return f

    return decorator


def derive_files(context: DeriveFilesContext) -> Optional[DeriveFilesResult]:
    """
    Derive files from a trial_metadata blob given an `upload_type`

    Parameters
    ----------
    context: DeriveFilesContext(NamedTuple)
        with attributes:
            trial_metadata: dict
            upload_type: str
            fetch_artifact: Callable[[str, bool], Optional[Union[StringIO, BytesIO]]]
              * should return None if no artifact is found.
              * arg1 (str): object_url
              * arg2 (bool): if True, return artifact as StringIO, otherwise BytesIO.

    Returns
    -------
    Optional[DeriveFilesResult(NamedTuple)]
        if not None, with attributes:
            artifacts: List[Artifact]
            trial_metadata: dict

        None if context.upload_type does not have a defined file derivation
        all prism.SUPPORTED_SHIPPING_MANIFESTS are supported via _shipping_manifest_derivation()
        otherwise use wrapper @_register_derivation(upload_type: str)
    """
    if context.upload_type in prism.SUPPORTED_SHIPPING_MANIFESTS:
        return _shipping_manifest_derivation(context)

    if context.upload_type in _upload_type_derivations:
        return _upload_type_derivations[context.upload_type](context)


def _build_artifact(
    context: DeriveFilesContext,
    file_name: str,
    data: Union[str, bytes],
    file_type: str,
    data_format: str,
    metadata: Optional[dict] = None,
    include_upload_type: bool = False,
) -> Artifact:
    """Generate an Artifact object for the given arguments within a DeriveFilesContext."""
    trial_id = context.trial_metadata[prism.PROTOCOL_ID_FIELD_NAME]

    if include_upload_type:
        object_url = f"{trial_id}/{context.upload_type}/{file_name}"
    else:
        object_url = f"{trial_id}/{file_name}"

    return Artifact(
        object_url=object_url,
        data=data,
        file_type=file_type,
        data_format=data_format,
        metadata=metadata,
    )


def _shipping_manifest_derivation(context: DeriveFilesContext) -> DeriveFilesResult:
    """Generate files derived from a shipping manifest upload."""
    participants = pd.json_normalize(
        data=context.trial_metadata,
        record_path=["participants"],
        meta=[prism.PROTOCOL_ID_FIELD_NAME],
    )
    samples = pd.json_normalize(
        data=context.trial_metadata,
        record_path=["participants", "samples"],
        meta=[prism.PROTOCOL_ID_FIELD_NAME, ["participants", "cimac_participant_id"]],
    )

    participants.drop("samples", axis=1, inplace=True, errors="ignore")

    participants_csv = participants.to_csv(index=False)
    samples_csv = samples.to_csv(index=False)

    return DeriveFilesResult(
        [
            _build_artifact(
                context,
                file_name="participants.csv",
                file_type="participants info",
                data_format="csv",
                data=participants_csv,
            ),
            _build_artifact(
                context,
                file_name="samples.csv",
                file_type="samples info",
                data_format="csv",
                data=samples_csv,
            ),
        ],
        context.trial_metadata,  # return metadata without updates
    )


@_register_derivation("ihc")
def _ihc_derivation(context: DeriveFilesContext) -> DeriveFilesResult:
    """Generate a combined CSV for IHC data"""
    combined = pd.json_normalize(
        data=context.trial_metadata,
        record_path=["assays", "ihc", "records"],
        meta=[prism.PROTOCOL_ID_FIELD_NAME],
    )

    # remove all artifact related columns
    combined.drop(
        [c for c in combined.columns if c.startswith("files.")],
        axis=1,
        inplace=True,
        errors="ignore",
    )

    combined_csv = combined.to_csv(index=False)

    return DeriveFilesResult(
        [
            _build_artifact(
                context,
                "combined.csv",
                combined_csv,
                "ihc marker combined",
                "csv",
                include_upload_type=True,
            )
        ],
        context.trial_metadata,  # return metadata without updates
    )


@_register_derivation("olink")
def _olink_derivation(context: DeriveFilesContext) -> DeriveFilesResult:
    """Generate a single analysis-ready NPX file across the entire trial for only samples/analytes"""
    olink = context.trial_metadata.get("assays", {}).get("olink", {})

    def download_and_parse_npx(npx_url: str) -> Optional[pd.DataFrame]:
        npx_stream = context.fetch_artifact(npx_url, False)
        if npx_stream:
            # NPX (xlsx) format is shown on page 12 of
            # olink.com/content/uploads/2017/11/1078-v1.0-Olink-NPX-Manager-User-Guide_final.pdf
            df = pd.read_excel(
                npx_stream,
                # first 3 rows aren't needed for a single panel
                header=3,  # this is the `Assay` row
                index_col=0,  # these are the sample ids
                engine="openpyxl",  # default engine doesn't handle xlsx
            )

            # use this to capture cases where the column name changes in spacing / capitalization
            ## needed because some data has 'OlinkID' while the standard seems to call for 'Olink ID'
            olink_id = df.index[df.index.str.lower().str.replace(" ", "") == "olinkid"][
                0
            ]

            df.columns = pd.MultiIndex.from_tuples(
                [
                    (
                        c,  # this is `Assay` due to header=3 above
                        df.loc["Uniprot ID", c],
                        df.loc[olink_id, c],
                        df.loc["LOD", c],
                    )
                    for c in df.columns
                ],
                names=["Assay", "Uniprot ID", "Olink ID", "LOD"],
            )
            df = df[
                [
                    c
                    for c in df.columns
                    # assert that only want columns that have OlinkID : OIDnnnnn
                    if isinstance(c[2], str) and c[2].startswith("OID")
                ]
            ]
            df.index.name = None
            return df.filter(
                # match against CIMAC regex
                regex=r"^C[A-Z0-9]{3}[A-Z0-9]{3}[A-Z0-9]{2}.[0-9]{2}$",
                axis=0,
            )

        return None

    return_files: Dict[str, pd.DataFrame] = {}
    if "object_url" in olink.get("study", {}).get("npx_file", {}):
        study_npx = olink["study"]["npx_file"]
        return_files["study_wide"] = download_and_parse_npx(study_npx["object_url"])
    else:
        for batch in olink.get("batches", []):
            if "combined" in batch:
                batch_npx = batch["combined"]["npx_file"]
                return_files[batch["batch_id"]] = download_and_parse_npx(
                    batch_npx["object_url"]
                )
            elif len(batch.get("records", [])) == 1:
                chip = batch["records"][0]
                chip_npx = chip["files"]["assay_npx"]
                return_files[chip["chip_barcode"]] = download_and_parse_npx(
                    chip_npx["object_url"]
                )
            else:
                raise Exception(
                    f"Olink for {context.trial_metadata.get(prism.constants.PROTOCOL_ID_FIELD_NAME)} batch {batch['batch_id']} has multiple chips but no batch-level summary file."
                )

    return DeriveFilesResult(
        [
            _build_artifact(
                context,
                file_name=f"all_samples_npx.{covers}.csv",
                data=df.to_csv(),
                file_type="csv",
                data_format="npx|analysis_ready",
                include_upload_type=True,
            )
            for covers, df in return_files.items()
        ],
        context.trial_metadata,  # return metadata without updates
    )


@_register_derivation("wes_analysis")
def _wes_analysis_derivation(context: DeriveFilesContext) -> DeriveFilesResult:
    """Generate a combined MAF file for an entire trial"""
    # Extract all run-level MAF URLs for this trial
    runs = pd.json_normalize(
        data=context.trial_metadata,
        record_path=["analysis", "wes_analysis", "pair_runs"],
    )
    maf_urls = runs["somatic.maf_tnscope_filter.object_url"]

    def download_and_parse_maf(maf_url: str) -> Optional[pd.DataFrame]:
        maf_stream = context.fetch_artifact(maf_url, True)
        if maf_stream:
            # First row will contain a comment, not headers, so skip it
            return pd.read_csv(maf_stream, sep="\t", skiprows=1)
        return None

    # Download all sample-level MAF files as dataframes
    maf_dfs = maf_urls.apply(download_and_parse_maf)

    # Combine all sample-level MAF dataframes
    combined_maf_df = pd.concat(maf_dfs.values, join="outer")

    # Write the combined dataframe to tab-separated string
    combined_maf = combined_maf_df.to_csv(sep="\t", index=False)

    return DeriveFilesResult(
        [
            _build_artifact(
                context,
                file_name="combined.maf",
                data=combined_maf,
                file_type="combined maf",
                data_format="maf",
                include_upload_type=True,
            )
        ],
        context.trial_metadata,  # return metadata without updates
    )


@_register_derivation("cytof_analysis")
def _cytof_analysis_derivation(context: DeriveFilesContext) -> DeriveFilesResult:
    """Generate a combined CSV for CyTOF analysis data"""
    cell_counts_analysis_csvs = pd.json_normalize(
        data=context.trial_metadata,
        record_path=["assays", "cytof", "records"],
        meta=[prism.PROTOCOL_ID_FIELD_NAME],
    )

    artifacts = []
    for combined_f_kind in [
        "cell_counts_assignment",
        "cell_counts_compartment",
        "cell_counts_profiling",
    ]:
        res_df = pd.DataFrame()
        for index, row in cell_counts_analysis_csvs.iterrows():
            obj_url = row[f"output_files.{combined_f_kind}.object_url"]

            cell_counts_csv = context.fetch_artifact(obj_url, True)

            if not cell_counts_csv:
                raise Exception(
                    f"Failed to read {obj_url} building Cytof analysis derivation"
                )

            df = pd.read_csv(cell_counts_csv)

            # Each cell_counts_... file consist of just records for one sample.
            # The first column of each cell_counts_csv (CellSubset) contains cell group types
            # and the second contains counts for those types.
            # Create a new, transposed dataframe with cell group types as column headers
            # and a single row of cell count data.
            df = df.set_index("CellSubset")
            df = df.drop(
                columns="Unnamed: 0", axis=1
            )  # Cell counts files contain an unnamed index column
            df = df.transpose()

            # and adding metadata, so we can distinguish different samples
            df = df.rename(index={"N": row["cimac_id"]})
            df["cimac_id"] = row["cimac_id"]
            df["cimac_participant_id"] = participant_id_from_cimac(row["cimac_id"])
            df[prism.PROTOCOL_ID_FIELD_NAME] = row[prism.PROTOCOL_ID_FIELD_NAME]

            # finally combine them
            res_df = pd.concat([res_df, df])

        # and add as artifact
        artifacts.append(
            _build_artifact(
                context=context,
                file_name=f"combined_{combined_f_kind}.csv",
                data=res_df.to_csv(index=False),
                data_format="csv",  # confusing, but right
                file_type=combined_f_kind.replace("_", " "),
                include_upload_type=True,
            )
        )

    return DeriveFilesResult(
        artifacts, context.trial_metadata  # return metadata without updates
    )