gimmemotifs/utils.py
# Copyright (c) 2009-2019 Simon van Heeringen <simon.vanheeringen@gmail.com>
#
# This module is free software. You can redistribute it and/or modify it under
# the terms of the MIT License, see the file COPYING included with this
# distribution.
""" Odds and ends that for which I didn't (yet) find another place """
import logging
import os
import random
import re
import sys
from functools import singledispatch
from io import TextIOWrapper
from subprocess import Popen
from tempfile import NamedTemporaryFile
import numpy as np
import pybedtools
import pyfaidx
import requests
from Bio.SeqIO.FastaIO import SimpleFastaParser
from genomepy import Genome
from gimmemotifs.config import MotifConfig
from gimmemotifs.fasta import Fasta
from gimmemotifs.plot import plot_histogram
from gimmemotifs.rocmetrics import ks_pvalue
logger = logging.getLogger("gimme.utils")
def rc(seq):
"""Return reverse complement of sequence"""
d = str.maketrans("actgACTG", "tgacTGAC")
return seq[::-1].translate(d)
def narrowpeak_to_bed(inputfile, bedfile, size=0):
"""Convert narrowPeak file to BED file."""
p = re.compile(r"^(#|track|browser)")
warn_no_summit = True
with open(bedfile, "w") as f_out, open(inputfile) as f_in:
for line in f_in:
if p.search(line):
continue
vals = line.strip().split("\t")
start, end = int(vals[1]), int(vals[2])
if size > 0:
summit = int(vals[9])
if summit == -1:
if warn_no_summit:
logger.warning(
"No summit present in narrowPeak file, "
"using the peak center."
)
warn_no_summit = False
summit = (end - start) // 2
start = start + summit - (size // 2)
end = start + size
f_out.write(f"{vals[0]}\t{start}\t{end}\t{vals[6]}\n")
def pfmfile_location(infile=None):
"""Return the path to the pfmfile"""
config = MotifConfig()
if infile is None:
infile = config.get_default_params().get("motif_db", None)
if infile is None:
raise ValueError(
"No motif file was given and no default "
"database specified in the config file."
)
if not os.path.exists(infile):
motif_dir = config.get_motif_dir()
checkfile = os.path.join(motif_dir, infile)
if os.path.exists(checkfile):
infile = checkfile
else:
for ext in [".pfm", ".pwm"]:
if os.path.exists(checkfile + ext):
infile = checkfile + ext
break
if not os.path.exists(infile):
raise FileNotFoundError(f"Motif file {infile} not found")
return infile
def get_jaspar_motif_info(motif_id):
query_url = f"http://jaspar.genereg.net/api/v1/matrix/{motif_id}?format=json"
result = requests.get(query_url)
if not result.ok:
result.raise_for_status()
sys.exit()
return result.json()
# def phyper_single(k, good, bad, N):
# lgam = special.gammaln
# return np.exp(
# lgam(good + 1)
# - lgam(good - k + 1)
# - lgam(k + 1)
# + lgam(bad + 1)
# - lgam(bad - N + k + 1)
# - lgam(N - k + 1)
# - lgam(bad + good + 1)
# + lgam(bad + good - N + 1)
# + lgam(N + 1)
# )
#
#
# def phyper(k, good, bad, N):
# """Current hypergeometric implementation in scipy is broken,
# so here's the correct version.
# """
# pvalues = [phyper_single(x, good, bad, N) for x in range(k + 1, N + 1)]
# return np.sum(pvalues)
def divide_file(fname, sample, rest, fraction, abs_max):
with open(fname) as f:
lines = f.readlines()
# random.seed()
random.shuffle(lines)
x = int(fraction * len(lines))
if x > abs_max:
x = abs_max
tmp = NamedTemporaryFile(mode="w", delete=False)
# Fraction as sample
for line in lines[:x]:
tmp.write(line)
tmp.flush()
# Make sure it is sorted for tools that use this information (MDmodule)
stdout, stderr = Popen(
f"sort -k4gr {tmp.name} > {sample}", shell=True
).communicate()
tmp.close()
if stderr:
logger.error(f"Something went wrong.\nstdout: {stdout}\nstderr; {stderr}")
sys.exit()
# Rest
f = open(rest, "w")
for line in lines[x:]:
f.write(line)
f.close()
# if os.path.exists(tmp.name):
# os.unlink(tmp.name)
return x, len(lines[x:])
def divide_fa_file(fname, sample, rest, fraction, abs_max):
fa = Fasta(fname)
ids = fa.ids[:]
x = int(fraction * len(ids))
if x > abs_max:
x = abs_max
sample_seqs = random.sample(ids, x)
# Rest
f_sample = open(sample, "w")
f_rest = open(rest, "w")
for name, seq in fa.items():
if name in sample_seqs:
f_sample.write(f">{name}\n{seq}\n")
else:
f_rest.write(f">{name}\n{seq}\n")
f_sample.close()
f_rest.close()
return x, len(ids[x:])
def write_equalsize_bedfile(bedfile, size, outfile):
"""Read input from <bedfile>, set the size of all entries to <size> and
write the result to <outfile>.
Input file needs to be in BED or WIG format."""
if size is None or size <= 0:
bed = pybedtools.BedTool(bedfile)
filtered_bed = pybedtools.BedTool(
bed.filter(lambda x: len(x) >= 10).saveas().fn
)
if len(bed) != len(filtered_bed):
logger.warning(
"Using original size of input file regions, however, "
"some regions are smaller than 10nt!"
)
logger.warning("Removing all these smaller regions.")
filtered_bed.saveas(outfile)
return
BUFSIZE = 10000
f = open(bedfile)
out = open(outfile, "w")
lines = f.readlines(BUFSIZE)
line_count = 0
while lines:
for line in lines:
line_count += 1
if (
not line.startswith("#")
and not line.startswith("track")
and not line.startswith("browser")
):
vals = line.strip().split("\t")
try:
start, end = int(vals[1]), int(vals[2])
except ValueError:
logger.error(
f"Error on line {line_count} while reading {bedfile}. "
"Is the file in BED or WIG format?"
)
sys.exit(1)
start = (start + end) // 2 - (size // 2)
# This shifts the center, but ensures the size is identical...
# maybe not ideal
if start < 0:
start = 0
end = start + size
# Keep all the other information in the bedfile if it's there
if len(vals) > 3:
rest = "\t".join(vals[3:])
out.write(f"{vals[0]}\t{start}\t{end}\t{rest}\n")
else:
out.write(f"{vals[0]}\t{start}\t{end}\n")
lines = f.readlines(BUFSIZE)
out.close()
f.close()
# def median_bed_len(bedfile):
# f = open(bedfile)
# lengths = []
# for i, line in enumerate(f.readlines()):
# if not (line.startswith("browser") or line.startswith("track")):
# vals = line.split("\t")
# try:
# lengths.append(int(vals[2]) - int(vals[1]))
# except ValueError:
# logger.error(
# f"Error in line {i}: "
# "coordinates in column 2 and 3 need to be integers!"
# )
# sys.exit(1)
# f.close()
# return np.median(lengths)
def motif_localization(fastafile, motif, size, outfile, cutoff=0.9):
NR_HIST_MATCHES = 100
matches = motif.scan(Fasta(fastafile), cutoff=cutoff, nreport=NR_HIST_MATCHES)
if len(matches) > 0:
ar = []
for a in matches.values():
ar += a
matches = np.array(ar)
p = ks_pvalue(matches, size - len(motif))
plot_histogram(
matches - size / 2 + len(motif) / 2,
outfile,
xrange=(-size / 2, size / 2),
breaks=21,
title=f"{motif.id} (p={p:.2e})",
xlabel="Position",
)
return motif.id, p
else:
return motif.id, 1.0
# def _treesort(order, nodeorder, nodecounts, tree):
# # From the Pycluster library, Michiel de Hoon
# # Find the order of the nodes consistent with the hierarchical clustering
# # tree, taking into account the preferred order of nodes.
# nNodes = len(tree)
# nElements = nNodes + 1
# neworder = np.zeros(nElements)
# clusterids = np.arange(nElements)
# for i in range(nNodes):
# i1 = tree[i].left
# i2 = tree[i].right
# if i1 < 0:
# order1 = nodeorder[-i1 - 1]
# count1 = nodecounts[-i1 - 1]
# else:
# order1 = order[i1]
# count1 = 1
# if i2 < 0:
# order2 = nodeorder[-i2 - 1]
# count2 = nodecounts[-i2 - 1]
# else:
# order2 = order[i2]
# count2 = 1
# # If order1 and order2 are equal, their order is determined
# # by the order in which they were clustered
# if i1 < i2:
# if order1 < order2:
# increase = count1
# else:
# increase = count2
# for j in range(nElements):
# clusterid = clusterids[j]
# if clusterid == i1 and order1 >= order2:
# neworder[j] += increase
# if clusterid == i2 and order1 < order2:
# neworder[j] += increase
# if clusterid == i1 or clusterid == i2:
# clusterids[j] = -i - 1
# else:
# if order1 <= order2:
# increase = count1
# else:
# increase = count2
# for j in range(nElements):
# clusterid = clusterids[j]
# if clusterid == i1 and order1 > order2:
# neworder[j] += increase
# if clusterid == i2 and order1 <= order2:
# neworder[j] += increase
# if clusterid == i1 or clusterid == i2:
# clusterids[j] = -i - 1
# return np.argsort(neworder)
def number_of_seqs_in_file(fname):
try:
fa = Fasta(fname)
return len(fa)
except Exception:
pass
try:
bed = pybedtools.BedTool(fname)
return len([x for x in bed])
except Exception:
pass
logger.error(f"unknown filetype {fname}")
sys.exit(1)
def determine_file_type(fname):
"""
Detect file type.
The following file types are supported:
BED, narrowPeak, FASTA, list of chr:start-end regions
If the extension is bed, fa, fasta or narrowPeak, we will believe this
without checking!
Parameters
----------
fname : str
File name.
Returns
-------
filetype : str
Filename in lower-case.
"""
if not (isinstance(fname, str)):
raise ValueError(f"{fname} is not a string, does not represent a file name")
if not os.path.exists(fname):
raise ValueError(f"File {fname} does not exist!")
if not os.path.isfile(fname):
raise ValueError(f"{fname} is not a file!")
ext = os.path.splitext(fname)[1].lower()
if ext in [".bed"]:
return "bed"
elif ext in [".fa", ".fasta"]:
return "fasta"
elif ext in [".narrowpeak"]:
return "narrowpeak"
try:
Fasta(fname)
return "fasta"
except Exception:
pass
# Read first line that is not a comment or an UCSC-specific line
p = re.compile(r"^(#|track|browser)")
with open(fname) as f:
for line in f.readlines():
line = line.strip()
if not p.search(line):
break
region_p = re.compile(r"^(.+):(\d+)-(\d+)$")
if region_p.search(line):
return "region"
else:
vals = line.split("\t")
if len(vals) >= 3:
try:
_, _ = int(vals[1]), int(vals[2])
except ValueError:
return "unknown"
if len(vals) == 10:
try:
_, _ = int(vals[4]), int(vals[9])
return "narrowpeak"
except ValueError:
# As far as I know there is no 10-column BED format
return "unknown"
return "bed"
# Catch-all
return "unknown"
# def get_seqs_type(seqs):
# """
# automagically determine input type
# the following types are detected:
# - Fasta object
# - FASTA file
# - list of regions
# - region file
# - BED file
# """
# if isinstance(seqs, Fasta):
# return "fasta"
# elif isinstance(seqs, list) or isinstance(seqs, np.ndarray):
# if len(seqs) == 0:
# raise ValueError("empty list of sequences to scan")
# else:
# region_p = re.compile(r"^([^\s:]+\@)?(.+):(\d+)-(\d+)$")
# if region_p.search(seqs[0]):
# return "regions"
# else:
# raise ValueError("unknown region type")
# elif isinstance(seqs, str):
# if os.path.isfile(seqs):
# ftype = determine_file_type(seqs)
# if ftype == "unknown":
# raise ValueError("unknown type")
# elif ftype == "narrowpeak":
# raise ValueError("narrowPeak not yet supported in this function")
# else:
# return ftype + "file"
# else:
# raise ValueError(f"no file found with name {seqs}")
# else:
# raise ValueError(f"unknown type {type(seqs).__name__}")
def _check_minsize(fa, minsize):
"""
Raise ValueError if there is any sequence that is shorter than minsize.
If minsize is None the size will not be checked.
"""
if minsize is None:
return fa
for name, seq in fa.items():
if len(seq) < minsize:
raise ValueError(f"sequence {name} is shorter than {minsize}")
return fa
def _genomepy_convert(to_convert, genome, minsize=None):
"""
Convert a variety of inputs using track2fasta().
"""
if genome is None:
raise ValueError("input file is not a FASTA file, need a genome!")
if isinstance(genome, Genome):
g = genome
else:
g = Genome(genome)
tmpfile = NamedTemporaryFile()
try:
g.track2fasta(to_convert, tmpfile.name)
except TypeError:
logger.error("Input file type not recognized!")
logger.error(
"This can happen if you use regions and your regions are not in chrom:start-end format."
)
logger.error(
"Another common issue is spaces instead of tabs, or extra spaces in addition to tabs."
)
sys.exit(1)
fa = as_seqdict(tmpfile.name)
return _check_minsize(fa, minsize)
def _as_seqdict_genome_regions(regions, minsize=None):
"""
Accepts list of regions where the genome is encoded in the region,
using the genome@chrom:start-end format.
"""
genomic_regions = {}
for region in regions:
genome, region = region.split("@")
if genome not in genomic_regions:
Genome(genome)
genomic_regions[genome] = []
genomic_regions[genome].append(region)
tmpfa = NamedTemporaryFile(mode="w", delete=False)
for genome, g_regions in genomic_regions.items():
g = Genome(genome)
fa = g.track2fasta(g_regions)
for seq in fa:
seq.name = f"{genome}@{seq.name}"
print(seq.__repr__(), file=tmpfa)
tmpfa.flush()
# Open tempfile and restore original sequence order
fa = as_seqdict(tmpfa.name)
fa = {region: fa[region] for region in regions}
return _check_minsize(fa, minsize)
def _as_seqdict_sequences(regions, minsize=None):
"""
Accepts list of DNA strings.
"""
fa = dict([(i + 1, regions) for i, regions in enumerate(regions)])
return _check_minsize(fa, minsize)
@singledispatch
def as_seqdict(to_convert, genome=None, minsize=None):
"""
Convert input to a dictionary with name as key and sequence as value.
If the input contains genomic coordinates, the genome needs to be
specified. If minsize is specified all sequences will be checked if they
are not shorter than minsize. If regions (or a region file) are used as
the input, the genome can optionally be specified in the region using the
following format: genome@chrom:start-end.
Current supported input types include:
* FASTA, BED and region files.
* List or numpy.ndarray of regions.
* pyfaidx.Fasta object.
* pybedtools.BedTool object.
Parameters
----------
to_convert : list, str, pyfaidx.Fasta or pybedtools.BedTool
Input to convert to FASTA-like dictionary
genome : str, optional
Genomepy genome name.
minsize : int or None, optional
If specified, check if all sequences have at least size minsize.
Returns
-------
dict with sequence names as key and sequences as value.
"""
raise NotImplementedError(f"Not implement for {type(to_convert)}")
@as_seqdict.register(list)
def _as_seqdict_list(to_convert, genome=None, minsize=None):
"""
Accepts list of regions or DNA strings as input.
"""
# Use regular expression to check for region
# (chr:start-end or genome@chr:start-end)
# regions
region_p = re.compile(r"^[^@]+@([^\s]+):(\d+)-(\d+)$")
if region_p.match(to_convert[0]):
return _as_seqdict_genome_regions(to_convert, minsize)
# DNA
fa_p = re.compile(r"^[actgnACTGN]+$")
if fa_p.match(to_convert[0]):
return _as_seqdict_sequences(to_convert, minsize)
return _genomepy_convert(to_convert, genome, minsize)
@as_seqdict.register(TextIOWrapper)
def _as_seqdict_file_object(to_convert, genome=None, minsize=None):
"""
Accepts file object as input, should be a FASTA file.
"""
fa = {x: y for x, y in SimpleFastaParser(to_convert)}
return _check_minsize(fa, minsize)
@as_seqdict.register(str)
def _as_seqdict_filename(to_convert, genome=None, minsize=None):
"""
Accepts filename as input.
"""
if not os.path.exists(to_convert):
raise ValueError("Assuming filename, but it does not exist")
with open(to_convert) as f:
fa = as_seqdict(f)
if any(fa):
return _check_minsize(fa, minsize)
with open(to_convert) as f:
line = ""
for line in f.readline():
if line == "":
break
if not line.startswith("#"):
break
if line == "":
raise IOError(f"empty file {to_convert}")
region_p = re.compile(r"^[^@]+@([^\s]+):(\d+)-(\d+)$")
if region_p.match(line.strip()):
regions = [myline.strip() for myline in [line] + f.readlines()]
return _as_seqdict_genome_regions(regions, minsize=None)
# Biopython parser resulted in empty dict
# Assuming it's a BED or region file
return _genomepy_convert(to_convert, genome, minsize)
@as_seqdict.register(pyfaidx.Fasta)
def _as_seqdict_pyfaidx(to_convert, genome=None, minsize=None):
"""
Accepts pyfaidx.Fasta object as input.
"""
fa = {k: str(v) for k, v in to_convert.items()}
return _check_minsize(fa, minsize)
@as_seqdict.register(pybedtools.BedTool)
def _as_seqdict_bedtool(to_convert, genome=None, minsize=None):
"""
Accepts pybedtools.BedTool as input.
"""
return _genomepy_convert(
[f"{f[0]}:{f[1]}-{f[2]}" for f in to_convert], genome, minsize
)
@as_seqdict.register(np.ndarray)
def _as_seqdict_array(to_convert, genome=None, minsize=None):
"""
Accepts numpy.ndarray with regions as input.
"""
return as_seqdict(list(to_convert), genome, minsize)
def as_fasta(to_convert, genome=None, minsize=None):
if isinstance(to_convert, Fasta):
return to_convert
return Fasta(fdict=as_seqdict(to_convert, genome, minsize))
# def file_checksum(fname):
# """Return md5 checksum of file.
#
# Note: only works for files < 4GB.
#
# Parameters
# ----------
# filename : str
# File used to calculate checksum.
#
# Returns
# -------
# checkum : str
# """
# size = os.path.getsize(fname)
# with open(fname, "r+") as f:
# checksum = hashlib.md5(mmap.mmap(f.fileno(), size)).hexdigest()
# return checksum
def join_max(a, length, sep="", suffix=""):
lengths = [len(x) for x in a]
total = 0
for i, size in enumerate(lengths + [0]):
if total > (length - len(suffix)):
return sep.join(a[: i - 1]) + suffix
if i > 0:
total += 1
total += size
return sep.join(a)
def check_genome(genome):
"""Check if genome is a valid FASTA file or genomepy genome genome.
Parameters
----------
genome : str
Genome name or file to check.
Returns
-------
is_genome : bool
"""
try:
Genome(genome, rebuild=False)
return True
except FileNotFoundError:
return False
def make_equal_length(a, b, pos, truncate=None, bg=None):
"""Align two weight matrices and make sure they are of equal length.
Align two matrices according to a specific position shift and either
extend the shorted with background frequences, or truncate the longer
matrix. The `truncate` argument can be either None, "first", "second" or
"both"
If `a` would represent the consensus "TGA" and `b` would represent "GAT",
the result, represented as consensus, for the different truncate arguments
would be:
`None` : "TGAN", "NGAT"
`"first"` : "TGA", "NGA"
`"second"`: "GAN", "GAT"
`"both"` : "GA", "GA"
Parameters
----------
a : array_like
Position-specific scoring, weight or probability matrix.
b : array_like
Position-specific scoring, weight or probability matrix.
truncate : str or None, optional
How to truncate the resulting matrices. Valid options are `None`,
`"first"`, `"second"` or `"both"`.
bg : array_like, optional
The background frequences, default `[0.25, 0.25, 0.25, 0.25]`.
Returns
-------
array_like, array_like
The matrices `a` and `b` shifted, and truncated according to the arguments.
Regardless of the arguments, the matrices are of equal length.
"""
if bg is None:
bg = [0.25, 0.25, 0.25, 0.25]
if truncate is not None and truncate not in ["both", "first", "second"]:
raise ValueError(
"Valid values for truncate are None, 'first', second' or 'both'"
)
truncate_first = False
truncate_second = False
truncate_first = truncate in ["both", "first"]
truncate_second = truncate in ["both", "second"]
len_a = len(a)
len_b = len(b)
mtx_len = max(len_a - pos, len_b) if pos < 0 else max(len_a, len_b + pos)
second_pos = max(pos, 0)
first_pos = max(-1 * pos, 0)
first = np.array([bg.copy() for x in range(mtx_len)])
first[first_pos : first_pos + len_a] = a
second = np.array([bg.copy() for x in range(mtx_len)])
second[second_pos : second_pos + len_b] = b
mask_first = np.zeros(mtx_len, dtype=bool)
mask_second = np.zeros(mtx_len, dtype=bool)
if truncate_first:
mask_second[first_pos : first_pos + len_a] = True
else:
mask_second[:] = True
if truncate_second:
mask_first[second_pos : second_pos + len_b] = True
else:
mask_first[:] = True
# if not truncate_first and not truncate_second:
return first[mask_second & mask_first], second[mask_second & mask_first]
def ppm_pseudocount(ppm, pseudo=1e-6):
"""Return position-specific probability matrix with added pseudocount.
Parameters
----------
ppm : array_like
Position-specific probability matrix.
pseudo : float
Pseudocount
Returns
-------
array_like
Position-specific probability matrix with added pseudocount.
"""
return (ppm + pseudo) / (ppm + pseudo).sum(1)[:, None]