vanheeringen-lab/gimmemotifs

View on GitHub
gimmemotifs/motif/denovo.py

Summary

Maintainability
A
0 mins
Test Coverage
C
70%
# Copyright (c) 2009-2019 Simon van Heeringen <simon.vanheeringen@gmail.com>
#
# This module is free software. You can redistribute it and/or modify it under
# the terms of the MIT License, see the file COPYING included with this
# distribution.
"""De novo motif prediction.

This module contains functions to predict *de novo* motifs using one or more
*de novo* motif tools. The main function is `gimme_motifs`, which is likely
the only method that you'll need from this module.

Examples
--------
from gimmemotifs.motif import gimme_motifs

peaks = "Gm12878.CTCF.top500.w200.fa"
outdir = "CTCF.gimme"
params = {"tools": "Homer,BioProspector", "genome": "hg38"}

motifs = gimme_motifs(peaks, outdir, params=params)
"""
import datetime
import logging
import logging.handlers
import os
import shutil
import sys

import numpy as np
from genomepy import Genome

from gimmemotifs import mytmpdir
from gimmemotifs.background import (
    MarkovFasta,
    MatchedGcFasta,
    PromoterFasta,
    RandomGenomicFasta,
)
from gimmemotifs.config import BG_RANK, MotifConfig, parse_denovo_params
from gimmemotifs.fasta import Fasta
from gimmemotifs.motif.cluster import cluster_motifs_with_report
from gimmemotifs.motif.prediction import predict_motifs
from gimmemotifs.motif.read import read_motifs
from gimmemotifs.motif.validation import check_denovo_input
from gimmemotifs.report import create_denovo_motif_report
from gimmemotifs.stats import calc_stats, rank_motifs, write_stats
from gimmemotifs.utils import (
    divide_fa_file,
    divide_file,
    narrowpeak_to_bed,
    write_equalsize_bedfile,
)

logger = logging.getLogger("gimme.denovo")


def prepare_denovo_input_narrowpeak(inputfile, params, outdir):
    """Prepare a narrowPeak file for de novo motif prediction.

    All regions to same size; split in test and validation set;
    converted to FASTA.

    Parameters
    ----------
    inputfile : str
        BED file with input regions.

    params : dict
        Dictionary with parameters.

    outdir : str
        Output directory to save files.
    """
    bedfile = os.path.join(outdir, f"{os.path.basename(inputfile)}_to.bed")
    size = int(params["size"])
    narrowpeak_to_bed(inputfile, bedfile, size=size)

    prepare_denovo_input_bed(bedfile, params, outdir)


def prepare_denovo_input_bed(inputfile, params, outdir):
    """Prepare a BED file for de novo motif prediction.

    All regions to same size; split in test and validation set;
    converted to FASTA.

    Parameters
    ----------
    inputfile : str
        BED file with input regions.

    params : dict
        Dictionary with parameters.

    outdir : str
        Output directory to save files.
    """
    # output files
    pred_fa = os.path.join(outdir, "prediction.fa")
    val_fa = os.path.join(outdir, "validation.fa")
    loc_fa = os.path.join(outdir, "localization.fa")

    bedfile = os.path.join(outdir, "input.bed")
    pred_bed = os.path.join(outdir, "prediction.bed")
    val_bed = os.path.join(outdir, "validation.bed")

    # Create BED file with regions of equal size
    size = int(params["size"])
    write_equalsize_bedfile(inputfile, size, bedfile)

    abs_max = int(params["abs_max"])
    fraction = float(params["fraction"])
    with open(bedfile) as f:
        n_regions = len(f.readlines())

    if n_regions < 500 and fraction <= 0.2:
        logger.warning(
            f"You have {n_regions} input regions and only "
            f"{int(fraction * n_regions)} will be used for motif prediction."
        )
        logger.warning(
            "You may consider to increase the fraction for prediction (-f, --fraction)"
        )

    # Split input into prediction and validation set
    logger.debug(
        f"Splitting {bedfile} into prediction set ({pred_bed}) "
        f"and validation set ({val_bed})"
    )
    divide_file(bedfile, pred_bed, val_bed, fraction, abs_max)

    # convert BED files to FASTA files
    genome = Genome(params["genome"])
    genome.track2fasta(pred_bed, pred_fa)
    genome.track2fasta(val_bed, val_fa)

    # Create file for location plots
    lsize = int(params["lsize"])
    extend = (lsize - size) // 2
    genome.track2fasta(
        val_bed,
        loc_fa,
        extend_up=extend,
        extend_down=extend,
        stranded=params["use_strand"],
    )


def prepare_denovo_input_fa(inputfile, params, outdir):
    """Create all the FASTA files for de novo motif prediction and validation.

    Parameters
    ----------
    """
    # output files
    pred_fa = os.path.join(outdir, "prediction.fa")
    val_fa = os.path.join(outdir, "validation.fa")
    loc_fa = os.path.join(outdir, "localization.fa")

    fraction = float(params["fraction"])
    abs_max = int(params["abs_max"])
    n_regions = len(Fasta(inputfile))

    if n_regions < 500 and fraction <= 0.2:
        logger.warning(
            f"You have {n_regions} input regions and only "
            f"{int(fraction * n_regions)} will be used for motif prediction."
        )
        logger.warning(
            "You may consider to increase the fraction for prediction (-f, --fraction)"
        )

    # Split inputfile in prediction and validation set
    logger.debug(
        f"Splitting {inputfile} into prediction set ({pred_fa}) "
        f"and validation set ({val_fa})"
    )
    divide_fa_file(inputfile, pred_fa, val_fa, fraction, abs_max)

    # Create localization set for location plots
    shutil.copy(val_fa, loc_fa)
    seqs = Fasta(loc_fa).seqs
    lsize = len(seqs[0])
    all_same_size = not (False in [len(seq) == lsize for seq in seqs])
    if not all_same_size:
        logger.warning(
            "PLEASE NOTE: FASTA file contains sequences of different sizes. "
            "Positional preference plots might be incorrect!"
        )


def create_background(
    bg_type,
    fafile,
    outfile,
    genome="hg18",
    size=200,
    nr_times=10,
    custom_background=None,
):
    """Create background of a specific type.

    Parameters
    ----------
    bg_type : str
        Name of background type.

    fafile : str
        Name of input FASTA file.

    outfile : str
        Name of output FASTA file.

    genome : str, optional
        Genome name.

    size : int, optional
        Size of regions.

    nr_times : int, optional
        Generate this times as many background sequences as compared to
        input file.

    Returns
    -------
    nr_seqs  : int
        Number of sequences created.
    """
    size = int(size)
    config = MotifConfig()
    fg = Fasta(fafile)

    if bg_type in ["genomic", "gc"]:
        if not genome:
            logger.error("Need a genome to create background")
            sys.exit(1)

    if bg_type == "random":
        f = MarkovFasta(fg, k=1, n=nr_times * len(fg))
        logger.debug(f"Random background: {outfile}")
    elif bg_type == "genomic":
        logger.debug("Creating genomic background")
        f = RandomGenomicFasta(genome, size, nr_times * len(fg))
    elif bg_type == "gc":
        logger.debug("Creating GC matched background")
        f = MatchedGcFasta(fafile, genome, nr_times * len(fg))
        logger.debug(f"GC matched background: {outfile}")
    elif bg_type == "promoter":
        gene_file = Genome(genome).annotation_bed_file
        if not gene_file:
            gene_file = os.path.join(config.get_gene_dir(), f"{genome}.bed")
        if not os.path.exists(gene_file):
            logger.error(f"Could not find a gene file for genome {genome}")
            logger.error("Did you use the --annotation flag for genomepy?")
            logger.error(
                f"Alternatively make sure there is a file called {genome}.bed "
                f"in {config.get_gene_dir()}"
            )
            sys.exit(1)

        logger.info(
            f"Creating random promoter background ({genome}, using genes in {gene_file})"
        )
        f = PromoterFasta(gene_file, genome, size, nr_times * len(fg))
        logger.debug(f"Random promoter background: {outfile}")
    elif bg_type == "custom":
        bg_file = custom_background
        if not bg_file:
            raise ValueError("Background file not specified!")

        if not os.path.exists(bg_file):
            raise FileNotFoundError(f"Custom background file {bg_file} does not exist!")
        else:
            logger.info(f"Copying custom background file {bg_file} to {outfile}.")
            f = Fasta(bg_file)
            median_length = np.median([len(seq) for seq in f.seqs])
            if median_length < (size * 0.95) or median_length > (size * 1.05):
                logger.warning(
                    f"The custom background file {bg_file} contains sequences with a "
                    f"median size of {median_length}, while GimmeMotifs predicts motifs in sequences "
                    f"of size {size}. This will influence the statistics! It is recommended "
                    "to use background sequences of the same size."
                )
    else:
        raise ValueError

    f.writefasta(outfile)
    return len(f)


def create_backgrounds(
    outdir, background=None, genome="hg38", size=200, custom_background=None
):
    """Create different backgrounds for motif prediction and validation.

    Parameters
    ----------
    outdir : str
        Directory to save results.

    background : list, optional
        Background types to create, default is 'random'.

    genome : str, optional
        Genome name (for genomic and gc backgrounds).

    size : int, optional
        Size of background regions

    Returns
    -------
    bg_info : dict
        Keys: background name, values: file name.
    """
    if background is None:
        background = ["random"]
        nr_sequences = {}

    # Create background for motif prediction
    if "gc" in background:
        pred_bg = "gc"
    else:
        pred_bg = background[0]

    create_background(
        pred_bg,
        os.path.join(outdir, "prediction.fa"),
        os.path.join(outdir, "prediction.bg.fa"),
        genome=genome,
        size=size,
        custom_background=custom_background,
    )

    # Get background fasta files for statistics
    bg_info = {}
    nr_sequences = {}
    for bg in background:
        fname = os.path.join(outdir, f"bg.{bg}.fa")
        nr_sequences[bg] = create_background(
            bg,
            os.path.join(outdir, "validation.fa"),
            fname,
            genome=genome,
            size=size,
            custom_background=custom_background,
        )

        bg_info[bg] = fname
    return bg_info


def _is_significant(stats, metrics=None):
    """Filter significant motifs based on several statistics.

    Parameters
    ----------
    stats : dict
        Statistics disctionary object.

    metrics : sequence
        Metric with associated minimum values. The default is
        (("max_enrichment", 3), ("roc_auc", 0.55), ("enr_at_fpr", 0.55))

    Returns
    -------
    significant : bool
    """
    if metrics is None:
        metrics = (("max_enrichment", 3), ("roc_auc", 0.55), ("enr_at_fpr", 0.55))

    for stat_name, min_value in metrics:
        if stats.get(stat_name, 0) < min_value:
            return False

    return True


def filter_significant_motifs(fname, result, bg, metrics=None):
    """Filter significant motifs based on several statistics.

    Parameters
    ----------
    fname : str
        Filename of output file were significant motifs will be saved.

    result : PredictionResult instance
        Contains motifs and associated statistics.

    bg : str
        Name of background type to use.

    metrics : sequence
        Metric with associated minimum values. The default is
        (("max_enrichment", 3), ("roc_auc", 0.55), ("enr_at_f[r", 0.55))

    Returns
    -------
    motifs : list
        List of Motif instances.
    """
    sig_motifs = []
    with open(fname, "w") as f:
        for motif in result.motifs:
            stats = result.stats.get(f"{motif.id}_{motif.to_consensus()}", {}).get(
                bg, {}
            )
            if _is_significant(stats, metrics):
                f.write(f"{motif.to_pfm()}\n")
                sig_motifs.append(motif)

    logger.info(f"{len(sig_motifs)} motifs are significant")
    logger.debug(f"written to {fname}")

    return sig_motifs


def best_motif_in_cluster(
    single_pwm,
    clus_pwm,
    clusters,
    fg_fa,
    background,
    genome,
    stats=None,
    metrics=("roc_auc", "recall_at_fdr"),
):
    """Return the best motif per cluster for a clustering results.

    The motif can be either the average motif or one of the clustered motifs.

    Parameters
    ----------
    single_pwm : str
        Filename of motifs.

    clus_pwm : str
        Filename of motifs.

    clusters :
        Motif clustering result.

    fg_fa : str
        Filename of FASTA file.

    background : dict
        Dictionary for background file names.

    genome : str
        Genome name.

    stats : dict, optional
        If statistics are not supplied they will be computed.

    metrics : sequence, optional
        Metrics to use for motif evaluation. Default are "roc_auc" and
        "recall_at_fdr".

    Returns
    -------
    motifs : list
        List of Motif instances.
    """
    # combine original and clustered motifs
    motifs = read_motifs(single_pwm) + read_motifs(clus_pwm)
    motifs = dict([(str(m), m) for m in motifs])

    # get the statistics for those motifs that were not yet checked
    eval_motifs = []
    for clus, singles in clusters:
        for motif in set([clus] + singles):
            if str(motif) not in stats:
                eval_motifs.append(motifs[str(motif)])

    if len(eval_motifs) > 0:
        new_stats = {}
        for bg, bg_fa in background.items():
            for m, s in calc_stats(
                fg_file=fg_fa, bg_file=bg_fa, motifs=eval_motifs, genome=genome
            ).items():
                if m not in new_stats:
                    new_stats[m] = {}
                new_stats[m][bg] = s
        stats.update(new_stats)

    rank = rank_motifs(stats, metrics)

    # rank the motifs
    best_motifs = []
    for clus, singles in clusters:
        if len(singles) > 1:
            eval_motifs = singles
            if clus not in motifs:
                eval_motifs.append(clus)
            eval_motifs = [motifs[str(e)] for e in eval_motifs]
            best_motif = sorted(eval_motifs, key=lambda x: rank[str(x)])[-1]
            best_motifs.append(best_motif)
        else:
            best_motifs.append(clus)
        for bg in background:
            stats[str(best_motifs[-1])][bg]["num_cluster"] = len(singles)

    best_motifs = sorted(best_motifs, key=lambda x: rank[str(x)], reverse=True)
    return best_motifs


def rename_motifs(motifs, stats=None):
    """Rename motifs to GimmeMotifs_1..GimmeMotifs_N.

    If stats object is passed, stats will be copied."""
    final_motifs = []
    for i, motif in enumerate(motifs):
        old = str(motif)
        motif.id = f"GimmeMotifs_{i + 1}"
        final_motifs.append(motif)
        if stats:
            stats[str(motif)] = stats[old].copy()

    if stats:
        return final_motifs, stats
    else:
        return final_motifs


def gimme_motifs(
    inputfile,
    outdir,
    params=None,
    filter_significant=True,
    cluster=True,
    create_report=True,
):
    """De novo motif prediction based on an ensemble of different tools.

    Parameters
    ----------
    inputfile : str
        Filename of input. Can be either BED, narrowPeak or FASTA.

    outdir : str
        Name of output directory.

    params : dict, optional
        Optional parameters.

    filter_significant : bool, optional
        Filter motifs for significance using the validation set.

    cluster : bool, optional
        Cluster similar predicted (and significant) motifs.

    create_report : bool, optional
        Create output reports (both .txt and .html).

    Returns
    -------
    motifs : list
        List of predicted motifs.

    Examples
    --------

    >>> from gimmemotifs.motif import gimme_motifs
    >>> gimme_motifs("input.fa", "motifs.out")
    """
    if outdir is None:
        outdir = f"gimmemotifs_{datetime.date.today().strftime('%d_%m_%Y')}"

    # Create output directories
    tmpdir = os.path.join(outdir, "intermediate")
    for d in [outdir, tmpdir]:
        if not os.path.exists(d):
            os.mkdir(d)

    # Log to file
    logger = logging.getLogger("gimme")
    logfile = os.path.join(outdir, "gimmemotifs.log")
    fh = logging.FileHandler(logfile, "w")
    fh.setLevel(logging.DEBUG)
    file_formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    fh.setFormatter(file_formatter)
    logger.addHandler(fh)
    logger = logging.getLogger("gimme.denovo")

    # Initialize parameters
    params = parse_denovo_params(params)

    # Check the input files
    input_type, background = check_denovo_input(inputfile, params)

    logger.info("starting full motif analysis")
    logger.debug(f"Using temporary directory {mytmpdir()}")

    params["size"] = int(params["size"])
    if params["size"] > 0:
        logger.info(
            f"using size of {params['size']}, set size to 0 to use original region size"
        )
    else:
        logger.info("using original size")

    # Create the necessary files for motif prediction and validation
    if not 0 < float(params["fraction"]) < 1:
        logger.error("Please specify a fraction between 0 and 1")
        sys.exit(1)
    if input_type == "bed":
        logger.info("preparing input from BED")
        prepare_denovo_input_bed(inputfile, params, tmpdir)
    elif input_type == "narrowpeak":
        logger.info("preparing input from narrowPeak")
        prepare_denovo_input_narrowpeak(inputfile, params, tmpdir)
    elif input_type == "fasta":
        logger.info("preparing input from FASTA")
        prepare_denovo_input_fa(inputfile, params, tmpdir)
    else:
        logger.error("unknown input file format!")
        sys.exit(1)

    # Create the background FASTA files
    background = create_backgrounds(
        tmpdir,
        background,
        params.get("genome", None),
        params["size"],
        params.get("custom_background", None),
    )

    # Predict de novo motifs
    result = predict_motifs(
        os.path.join(tmpdir, "prediction.fa"),
        os.path.join(tmpdir, "prediction.bg.fa"),
        os.path.join(tmpdir, "all_motifs.pfm"),
        params=params,
        stats_fg=os.path.join(tmpdir, "validation.fa"),
        stats_bg=background,
    )

    if len(result.motifs) == 0:
        logger.info("de novo finished")
        return []

    # Write statistics
    stats_file = os.path.join(tmpdir, "stats.{}.txt")
    write_stats(result.stats, stats_file)

    bg = sorted(background, key=lambda x: BG_RANK[x])[0]
    if filter_significant:
        motifs = filter_significant_motifs(
            os.path.join(tmpdir, "significant_motifs.pfm"), result, bg
        )
        if len(motifs) == 0:
            logger.info("no significant motifs")
            return []

        pfmfile = os.path.join(tmpdir, "significant_motifs.pfm")
    else:
        logger.info("not filtering for significance")
        motifs = result.motifs
        pfmfile = os.path.join(tmpdir, "all_motifs.pfm")

    if cluster:
        clusters = cluster_motifs_with_report(
            pfmfile,
            os.path.join(tmpdir, "clustered_motifs.pfm"),
            outdir,
            0.95,
            title=inputfile,
        )

        # Determine best motif in cluster
        best_motifs = best_motif_in_cluster(
            pfmfile,
            os.path.join(tmpdir, "clustered_motifs.pfm"),
            clusters,
            os.path.join(tmpdir, "validation.fa"),
            background,
            params["genome"],
            result.stats,
        )

        final_motifs, stats = rename_motifs(best_motifs, result.stats)
    else:
        logger.info("not clustering")
        rank = rank_motifs(result.stats)
        sorted_motifs = sorted(motifs, key=lambda x: rank[str(x)], reverse=True)
        final_motifs, stats = rename_motifs(sorted_motifs, result.stats)

    motifs_found = len(final_motifs) > 0

    if motifs_found:
        with open(os.path.join(outdir, "gimme.denovo.pfm"), "w") as f:
            for m in final_motifs:
                f.write(f"{m.to_ppm()}\n")

    if motifs_found and create_report:
        bg = dict([(b, os.path.join(tmpdir, f"bg.{b}.fa")) for b in background])

        create_denovo_motif_report(
            inputfile,
            os.path.join(outdir, "gimme.denovo.pfm"),
            os.path.join(tmpdir, "validation.fa"),
            bg,
            os.path.join(tmpdir, "localization.fa"),
            outdir,
            params,
            stats,
        )

    with open(os.path.join(outdir, "params.txt"), "w") as f:
        for k, v in params.items():
            f.write(f"{k}\t{v}\n")

    if not (params.get("keep_intermediate")):
        logger.debug(
            "Deleting intermediate files. "
            "Please specifify the -k option if you want to keep these files."
        )
        shutil.rmtree(tmpdir)

    logger.info("de novo finished")
    logger.info(f"output dir: {outdir}")
    if motifs_found and cluster:
        logger.info(f"de novo report: {os.path.join(outdir, 'gimme.denovo.html')}")

    return final_motifs