ucsc_genomes_downloader/genome.py
# Copyright (C) 2019 by
# Luca Cappelletti <luca.cappelletti1@unimi.it>
#
# All rights reserved.
# MIT license.
""" Class to automatically retrieve informations and sequences for a Genome from the UCSC Genome Browser assembly database."""
import os
import json
import shutil
import compress_json
import dateparser
import pandas as pd
import numpy as np
from numba import typed, types
import warnings
from datetime import datetime
from userinput.utils import closest
from tqdm.auto import tqdm
from multiprocessing import Pool, cpu_count
from typing import Dict, Generator, List, Tuple
from .utils import get_available_genomes, get_available_chromosomes, download_chromosome_wrapper, get_genome_informations
from .utils import multiprocessing_gaps
from .utils import extract_sequence, extract_sequences
class Genome:
"""Class to automatically retrieve informations and sequences for a Genome from the UCSC Genome Browser assembly database.
Usage examples
--------------
Simply instanziate a new genome
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code:: python
from ucsc_genomes_downloader import Genome
hg19 = Genome("hg19")
Getting gaps regions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code:: python
all_gaps = hg19.gaps() # Returns gaps for all chromosomes
# Returns gaps for chromosome chrM
chrM_gaps = hg19.gaps(chromosomes=["chrM"])
Getting filled regions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code:: python
all_filled = hg19.filled() # Returns filled for all chromosomes
# Returns filled for chromosome chrM
chrM_filled = hg19.filled(chromosomes=["chrM"])
Removing genome's cache
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code:: python
hg19.delete()
"""
def __init__(
self,
assembly: str,
chromosomes: Tuple[str] = None,
filters: Tuple[str] = ("chru", "chrmt", "scaffold", "contig", "super", "chrbin", "random", "hap", "alt", "fix"),
verbose: bool = True,
cache_directory: str = "genomes"
):
"""Instantiate a new Genome object.
Parameters
----------
assembly: str,
UCSC Genome Browser assembly ID for required genome [1]_.
chromosomes: Tuple[str] = None,
Tuple with the chromosomes to download.
If None (default) all the chromosomes except
the one filtered out are downloaded.
filters: Tuple[str] = ("chru", "scaffold", "contig", "super", "chrbin", "random", "hap", "alt", "fix"),
Tuple containing substring of chromosomes NOT to download.
If the `chromosomes` parameter is used, no such filter is applied.
verbose: bool = True,
Whetever to show a loading bar when retrieving chromosomes.
cache_directory: str = "genomes",
Position where to store the downloaded genomes.
Raises
------
ValueError:
If the given genome cannot be retrieved either from the given
cache directory or from the UCSC website [3]_.
ValueError:
If the given genome, once the filters have been applied,
does not contain any chromosome.
Returns
-------
A newly instantiated Genome object.
References
----------
.. [1] https://genome.ucsc.edu/FAQ/FAQdownloads.html
"""
self._chromosomes_lenghts = None
self._genome_informations = None
self._verbose = verbose
self._assembly = assembly
self._cache_directory = os.path.join(cache_directory, self.assembly)
# If cache is enabled and a directory for the genome
# exists within the cache we try to load the genome data
if os.path.exists(self._cache_directory):
self._genome_informations = self._load_genome_informations()
self._chromosomes_lenghts = self._load_chromosomes()
# Otherwise we check if the genome is available
else:
available_genomes = get_available_genomes()
if self.assembly not in available_genomes:
# If the genome is not available we raise a proper exception
raise ValueError(
(
"Given genome {assembly} is not within the available genomes. "
"Did you mean to specify the {closest} genomic assembly?"
).format(
assembly=self.assembly,
closest=closest(
self.assembly,
available_genomes
)
))
# Download genome informations if list is not already loaded
if self._genome_informations is None:
self._genome_informations = get_genome_informations(self.assembly)
# If cache is enabled we store the obtained genome informations
self._store_genome_informations()
# Download chromosomes if list is not already loaded
if self._chromosomes_lenghts is None:
self._chromosomes_lenghts = get_available_chromosomes(
self.assembly)
# If cache is enabled we store the obtained chromosomes lenghts
self._store_chromosomes()
# Filtering chromosomes
self._chromosomes = typed.Dict.empty(
types.unicode_type,
types.unicode_type
)
for chrom in self._chromosomes_lenghts:
if all(target not in chrom.lower() for target in filters) and chromosomes is None or chromosomes is not None and chrom in chromosomes:
self._chromosomes[str(chrom)] = ""
# If no chromosome remains after filtering,
# for instance when the raw data are not yet mapped
# we raise a userwarning
if len(self) == 0:
raise ValueError(
"No chromosome remaining in chosen genome {assembly}"
"after executing desired filters"
"and checking for online availability".format(
assembly=self.assembly)
)
self.NUCLEOTIDES_MAPPING = typed.Dict.empty(types.string, types.string)
for src, dst in {
"a": "t",
"A": "T",
"g": "c",
"G": "C",
"t": "a",
"T": "A",
"c": "g",
"C": "G",
"N": "n",
"n": "N",
}.items():
self.NUCLEOTIDES_MAPPING[src] = dst
self._download()
self._load()
def _genome_informations_path(self) -> str:
"""Return path for the JSON file with current genome informations."""
return "{cache_directory}/genome_informations.json".format(
cache_directory=self._cache_directory
)
def _load_genome_informations(self) -> Dict:
"""Return a dictionary with genome informations if available.
Raises
------
RuntimeWarning:
If genome informations are not available
locally.
"""
try:
compress_json.load(self._genome_informations_path())
except Exception:
warnings.warn(
"Failed to load genome {genome} informations. "
"I will try to download them again afterwards.".format(
genome=self.assembly
),
RuntimeWarning
)
return None
def _store_genome_informations(self):
"""Store genome informations into default cache directory."""
compress_json.dump(
self._genome_informations,
self._genome_informations_path()
)
def _chromosomes_path(self) -> str:
"""Return path to default chromosomes informations."""
return "{cache_directory}/chromosomes.json".format(
cache_directory=self._cache_directory
)
def _load_chromosomes(self) -> Dict:
"""Return a dictionary with genome chromosomes if available.
Raises
------
RuntimeWarning:
If genome chromosomes are not available
locally.
"""
try:
return compress_json.load(self._chromosomes_path())
except Exception:
warnings.warn(
"Failed to load chromosomes for genome {genome}. "
"I will try to download them again afterwards.".format(
genome=self.assembly
),
RuntimeWarning
)
return None
def _store_chromosomes(self):
"""Store chromosomes informations into default cache directory."""
compress_json.dump(
self._chromosomes_lenghts,
self._chromosomes_path()
)
def _chromosome_path(self, chromosome: str) -> str:
"""Return path to the given chromosome.
Parameters
----------
chromosome: str,
The chromosome identifier, such as chr1, chrX, chrM...
"""
return "{cache_directory}/chromosomes/{chromosome}.json.gz".format(
cache_directory=self._cache_directory,
chromosome=chromosome
)
def _load_chromosome(self, chromosome: str) -> str:
"""Return the nucleotides sequence for the given chromosome.
Parameters
----------
chromosome: str,
The chromosome identifier, such as chr1, chrX, chrM...
Raises
------
RuntimeWarning:
If given chromosome are not available locally.
"""
path = self._chromosome_path(chromosome)
try:
return compress_json.load(path)["dna"]
except json.decoder.JSONDecodeError:
os.remove(path)
raise Exception(
"Chromosome at path {path} is corrupt and has been deleted.".format(path=path))
def delete(self):
"""Remove the genome cache."""
if os.path.exists(self._cache_directory):
shutil.rmtree(self._cache_directory)
def _download(self):
"""Download the missing chromosomes."""
tasks = [
{
"assembly": self.assembly,
"chromosome": chromosome,
"start": 0,
"end": self._chromosomes_lenghts[chromosome],
"path": self._chromosome_path(chromosome)
}
for chromosome in self
if not self.is_chromosome_cached(chromosome)
]
if len(tasks):
with Pool(min(cpu_count(), len(tasks))) as p:
list(tqdm(
p.imap(
download_chromosome_wrapper,
tasks
),
desc="Downloading chromosomes for genome {assembly}".format(
assembly=self.assembly
),
total=len(tasks),
disable=not self._verbose,
dynamic_ncols=True,
leave=False
))
p.close()
p.join()
def _load(self):
"""Load into memory all the genome's chromosomes, downloading them when necessary."""
for chromosome in tqdm(
self,
desc="Loading chromosomes for genome {assembly}".format(
assembly=self.assembly
),
total=len(self),
disable=not self._verbose,
dynamic_ncols=True,
leave=False
):
self._chromosomes[chromosome] = self._load_chromosome(
chromosome
)
def __len__(self) -> int:
"""Return the number of chromosomes in current genome."""
return len(self._chromosomes)
def __contains__(self, chromosome: str) -> bool:
"""Return boolean representing if given chromosome is contained in current genome.
Parameters
----------
chromosome: str,
The chromosome identifier, such as chr1, chrX, chrM...
Returns
-------
Boolean representing if given chromosome is contained in current genome.
"""
return chromosome in self._chromosomes
def __getitem__(self, chromosome: str):
"""Return sequence data for given chromosome.
Parameters
----------
chromosome: str,
The chromosome identifier, such as chr1, chrX, chrM...
Returns
-------
String containing sequence data for given chromosome.
"""
return self._chromosomes[chromosome]
def items(self) -> Generator:
"""Return generator to iterate through tuples of chromosomes and corresponding sequences."""
for chromosome in self:
yield chromosome, self[chromosome]
def __iter__(self) -> Generator:
"""Return generator to iterate through the genome's chromosomes."""
for chromosome in self._chromosomes:
yield chromosome
def is_chromosome_cached(self, chromosome: str) -> bool:
"""Return a boolean representing if given chromosome is cached.
Parameters
----------
chromosome: str,
The chromosome identifier, such as chr1, chrX, chrM...
Returns
-------
Boolean representing if given chromosome is available locally.
"""
return os.path.exists(self._chromosome_path(chromosome))
def __str__(self):
"""Return string representation of current genome."""
return "{organism}, {scientific_name}, {genome}, {date}, {chromosomes} chromosomes".format(
organism=self.organism,
scientific_name=self.scientific_name,
genome=self.assembly,
date=self.date,
chromosomes=len(self)
)
__repr__ = __str__
def gaps(self, chromosomes: List[str] = None):
"""Return dataframe in BED format with informations on the gaps.
Parameters
----------
chromosomes: List[str] = None,
List of the chromosomes to parse, by default all.
FAQs
----
Why don't you just retrieve the gaps from the APIs?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The gaps track is not always available, therefore for providing a consistent
usage experience we compute the gaps when required. Additionally, this method
will work also when you make an offline request, that may sometimes be quite
useful.
Returns
-------
A DataFrame in BED format containing the gapped regions.
"""
if chromosomes is None:
chromosomes = list(self)
with Pool(min(cpu_count(), len(self))) as p:
gaps = pd.concat(list(tqdm(
p.imap(multiprocessing_gaps, (
(
chromosome, self[chromosome]
)
for chromosome in chromosomes
)),
total=len(chromosomes),
desc="Rendering gaps in {assembly}".format(
assembly=self.assembly
),
leave=False,
dynamic_ncols=True,
disable=not self._verbose
))).reset_index(drop=True)
p.close()
p.join()
return gaps
def filled(self, chromosomes: List[str] = None):
"""Return dataframe with BED-like columns with informations on the gaps.
Parameters
----------
chromosomes: List[str] = None,
List of the chromosomes to parse, by default all.
Returns
-------
A DataFrame in BED format containing the filled regions.
"""
non_gap = []
gapped_chromosomes = []
if chromosomes is None:
chromosomes = list(self)
for chrom, values in self.gaps(chromosomes).groupby("chrom"):
# We need to sort by the chromStart
# as we will need the rows to be ordered
# to be able to generate the complementary windows
values = values.sort_values(["chromStart"])
non_gap_values = pd.DataFrame({
"chrom": chrom,
"chromStart": values["chromEnd"][:-1].values,
"chromEnd": values["chromStart"][1:].values,
})
# If the chromosome lenght is not contained
# within the various chromEnd values it means that
# the final part of the chromosome is known
# and therefore considered filled.
# We need to add this additional row.
chromosome_lenght = self._chromosomes_lenghts[chrom]
if not values.chromEnd.isin([chromosome_lenght]).any():
non_gap_values = non_gap_values.append({
"chrom": chrom,
"chromStart": values.chromEnd.max(),
"chromEnd": chromosome_lenght
}, ignore_index=True)
# If the chromosome start, the 0 value,
# is not contained within the various chromStart values
# it means that the initial part of the chromosome
# is known and therefore considered filled.
# We need to add this additional row.
if not values.chromStart.isin([0]).any():
non_gap_values = non_gap_values.append({
"chrom": chrom,
"chromStart": 0,
"chromEnd": values.chromStart.min(),
}, ignore_index=True)
non_gap.append(non_gap_values)
gapped_chromosomes.append(chrom)
# When a chromosome does not appear to have
# any gap is considered fully filled
non_gap.append(pd.DataFrame([
{
"chrom": chrom,
"chromStart": 0,
"chromEnd": self._chromosomes_lenghts[chrom]
} for chrom in chromosomes if chrom not in gapped_chromosomes
]))
return pd.concat(non_gap).sort_values(["chrom"]).reset_index(drop=True)
def extract_sequence(self, chrom: str, chromStart: int, chromEnd: int, strand: str = ".") -> str:
"""Return genomic sequence for given coordinates.
Parameters
------------------------------------
chrom: str,
Chromosome to use.
chromStart: int,
Position from where to start slicing.
chromEnd: int,
Position where to stop the slice.
Returns
-------------------------------------
Sequence of nucleotides.
"""
return extract_sequence(
self._chromosomes,
chrom,
chromStart,
chromEnd,
strand,
self.NUCLEOTIDES_MAPPING
)
def bed_to_sequence(self, bed: pd.DataFrame) -> List[str]:
"""Return bed with an additional column containing the sequences."""
return extract_sequences(
self._chromosomes,
bed.chrom.values.astype(str),
bed.chromStart.values.astype(int),
bed.chromEnd.values.astype(int),
bed.strand.values.astype(str) if "strand" in bed.columns else None,
self.NUCLEOTIDES_MAPPING
)
@property
def assembly(self) -> str:
"""Return genome's UCSC Genome Browser assembly ID."""
return self._assembly
@property
def date(self) -> datetime:
"""Return release date."""
return dateparser.parse(self.description.split("(")[0]).date()
@property
def organism(self) -> str:
"""Return genome's organism."""
return self._genome_informations["organism"]
@property
def scientific_name(self):
"""Return genome's organism scientific name."""
return self._genome_informations["scientificName"]
@property
def description(self):
"""Return genome's description as provided by UCSC."""
return self._genome_informations["description"]
@property
def path(self):
"""Return genome's path."""
return self._cache_directory