LucaCappelletti94/epigenomic_dataset

View on GitHub
epigenomic_dataset/mine.py

Summary

Maintainability
A
2 hrs
Test Coverage
F
48%
from glob import glob
from multiprocessing import Pool, cpu_count
import os
from typing import Dict, List
from tqdm.auto import tqdm
import numpy as np
import pandas as pd
import csv
import gzip
import warnings
from .extract import load_epigenomes_table, load_accession_path


def compute_header(statistics: Dict[str, bool]) -> str:
    """Return the header for extracted epigenomes.

    Parameters
    ------------------
    statistics:Dict[str, bool]:
        The statistics to be computed

    Returns
    ------------------
    The header separated with tabs.
    """
    return "\t".join([
        "chrom", "chromStart", "chromEnd", "strand",
        *[s for s, enabled in statistics.items() if enabled]
    ])+'\n'


def get_callback(statistic: str):
    return {
        "mean": np.nanmean,
        "var": np.nanvar,
        "max": np.nanmax,
        "min": np.nanmin,
        "median": np.nanmedian
    }[statistic]


def get_target_path(root: str, cell_line: str, assembly: str, assay_term_name: str, target: str) -> str:
    """Return path where the target epigenomic data are to be stored.

    Parameters
    -----------------------
    root: str,
        Root from where to search the files
        to concatenate.
    cell_line: str,
        Cell line to consider.
    assembly: str,
        The genomic assembly of the data to be retrieved.
    assay_term_name: str,
        Name of the experiment.
    target: str,
        The name of the genomic target.

    Returns
    -----------------------
    The path where to store the epigenomic data.
    """
    return "{root}/{cell_line}/{file_name}.csv.gz".format(
        root=root,
        cell_line=cell_line,
        assembly=assembly,
        file_name=assay_term_name if target == "Unknown" else target
    )


def parse_extracted_epigenome(sources: List[str], target: str, statistics: Dict[str, bool]):
    """Parse the given source bed-like file.

    Parameters
    ----------------------------
    sources: List[str],
        Paths from where to load the sources.
    target: str,
        Epigenomic data target.
    statistics: Dict[str, bool]
        Statistics to be extracted.
    """
    header = compute_header(statistics)
    callbacks = [
        get_callback(s)
        for s, enabled in statistics.items()
        if enabled
    ]
    os.makedirs(os.path.dirname(target), exist_ok=True)

    source_files = [
        gzip.open(source, "rt")
        for source in sources
    ]

    readers = [
        csv.reader(source_file, delimiter='\t')
        for source_file in source_files
    ]
    try:
        with gzip.open(target, "wt") as t:
            # Starting by writing the head
            t.write(header)
            # And now we parse the lines one by one
            for rows in zip(*readers):
                # We extract the values
                chrom, chromStart, chromEnd, _, _, strand = rows[0][:6]
                # Convert the scores to float values
                scores = [
                    [
                        float(s) if s != "NA" else np.nan
                        for s in row[7:]
                    ]
                    for row in rows
                ]
                # Compute the averages in a fully defined way
                averaged_scores = [
                    np.nan
                    if len(sub_scores) == 0 or np.all(np.isnan(sub_scores))
                    else np.nanmean(sub_scores)
                    for sub_scores in scores
                ]
                # Compute the metrics
                metrics = [
                    str(np.nan)
                    if len(averaged_scores) == 0 or np.all(np.isnan(averaged_scores))
                    else cal(averaged_scores).astype(str)
                    for cal in callbacks
                ]
                # And write the results
                t.write("\t".join([
                    chrom, chromStart, chromEnd, strand,
                    *metrics
                ])+'\n')
    except EOFError:
        warnings.warn((
            "Unable to properly finish reading corrupted compressed files {}. "
            "I am now deleting these files. "
            "Just rerun the pipeline to retrieve them again."
        ).format(
            ", ".join(sources)
        ))
        for source in sources:
            os.remove(source)

    for source_file in source_files:
        source_file.close()


def _parse_extracted_epigenome(kwargs: Dict):
    return parse_extracted_epigenome(**kwargs)


def mine(
    root: str,
    statistics: Dict[str, bool],
    cell_lines: List[str],
    assembly: str
):
    """Extract and saves requested statistics from epigenomic files.

    Parameters
    -----------------------
    root: str,
        Root from where to search the files
        to concatenate.
    statistics: Dict[str, bool],
        Dictionary of statistics to extract from windows.
    cell_line: str,
        Cell line to consider.
    assembly: str,
        The genomic assembly of the data to be retrieved.
    """
    tasks = [
        {
            "sources": [
                load_accession_path(root, accession)
                for accession in group.accession
            ],
            "target": get_target_path(root, cell_line, assembly, assay_term_name, target),
            "statistics": statistics
        }
        for (cell_line, assay_term_name, target), group in load_epigenomes_table(cell_lines, assembly).groupby(["cell_line", "assay_term_name", "target"])
        if not os.path.exists(get_target_path(root, assembly, cell_line, assay_term_name, target))
    ]

    with Pool(cpu_count()) as p:
        list(tqdm(
            p.imap(
                _parse_extracted_epigenome,
                tasks
            ),
            desc="Parse extracted epigenomes",
            total=len(tasks)
        ))
        p.close()
        p.join()