reimandlab/Visualistion-Framework-for-Genome-Mutations

View on GitHub
website/models/bio/mutations.py

Summary

Maintainability
F
3 days
Test Coverage
from abc import ABCMeta, abstractmethod
from collections import UserList, OrderedDict
from functools import lru_cache
from typing import Type, Iterable, Mapping, List, Dict, TYPE_CHECKING

from sqlalchemy import select, func, or_, and_, exists
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.hybrid import hybrid_property, Comparator, hybrid_method
from sqlalchemy.orm import synonym, RelationshipProperty

from database import db
from database.types import ScalarSet
from helpers.models import generic_aggregator

from .diseases import ClinicalData
from .model import BioModel, make_association_table
from .sites import Site, SiteMotif


if TYPE_CHECKING:
    from .protein import Protein


class MutationDetailsManager(UserList, metaclass=ABCMeta):
    """Groups (by a unique mutation) mutation details from a given source"""

    @property
    @abstractmethod
    def name(self) -> str:
        """Internal name which has to remain unchanged, used in relationships"""
        pass

    @property
    @abstractmethod
    def value_type(self):
        """What is the value returned by 'get_value'.

        It can be 'count', 'frequency' or 'probability'.
        """
        pass

    @abstractmethod
    def to_json(self):
        pass

    def get_value(self, filter=lambda x: x):
        return sum(
            (
                data.get_value()
                for data in filter(self.data)
            )
        )

    summary = generic_aggregator('summary', is_callable=True)


def create_cancer_meta_manager(meta_name):

    class CancerMetaManager(MutationDetailsManager):
        name = meta_name
        value_type = 'count'

        def to_json(self, filter=lambda x: x):
            cancer_occurrences = [
                datum.to_json()
                for datum in
                filter(self.data)
            ]

            return {'Cancers': cancer_occurrences}

        def summary(self, filter=lambda x: x):
            return [
                datum.summary()
                for datum in self.data
                # one could use:

                # for datum in filter(self.data)

                # so only the cancer data from currently selected cancer type are show,
                # but then the user may think that the mutation is specific for this
                # type of cancer
            ]

    return CancerMetaManager


class MIMPMetaManager(MutationDetailsManager):

    name = 'MIMP'
    value_type = 'probability'

    @staticmethod
    def sort_by_probability(mimps):
        return mimps.sort(
            key=lambda mimp: mimp.probability,
            reverse=True
        )

    def to_json(self, filter=lambda x: x):

        gains = []
        losses = []

        for mimp in self.data:
            if mimp.is_gain:
                gains.append(mimp)
            else:
                losses.append(mimp)

        self.sort_by_probability(gains)
        self.sort_by_probability(losses)

        return {
            'gain': [mimp.to_json() for mimp in gains],
            'loss': [mimp.to_json() for mimp in losses],
            'effect': self.effect
        }

    @property
    def effect(self):
        effects = set()

        # TODO: sort by p-value, so first we will have loss
        # if the loss data is more reliable and vice versa.
        for mimp in self.data:
            if mimp.is_gain:
                effects.add('gain')
            else:
                effects.add('loss')

        return '/'.join(effects)

    @property
    def sites(self):
        return set(
            (
                mimp.site
                for mimp in self.data
            )
        )


class MutationDetails:
    """Base for tables defining detailed metadata for specific mutations"""

    def get_value(self, filter=lambda x: x):
        """Return number representing value to be used in needleplot"""
        raise NotImplementedError

    def to_json(self, filter=lambda x: x):
        """Return JSON serializable representation for needleplot tooltips.

        The default serializer is: json.JSONEncoder
            https://docs.python.org/3/library/json.html#json.JSONEncoder
        """
        raise NotImplementedError

    def summary(self, filter=lambda x: x):
        """Return short JSON serializable representation of the mutation"""
        raise NotImplementedError

    @property
    def display_name(self):
        """Name which will be shown to the user (e.g. in filters panel)"""
        raise NotImplementedError

    @property
    def name(self) -> str:
        """Internal name which has to remain unchanged, used in relationships"""
        raise NotImplementedError

    @property
    def is_confirmed(self):
        """Does this mutation come from an experiment (True) or it is a prediction (False)?"""
        return True

    @property
    def is_visible(self):
        """Should this mutation be shown to the user?"""
        return True

    @property
    def value_type(self):
        """What is the value returned by 'get_value'.

        Either 'count' or 'frequency'.
        """
        raise NotImplementedError

    def __repr__(self):
        return '<{class_name} - details of {refseq}:{mutation} with summary: {summary}>'.format(
            class_name=self.name,
            refseq=self.mutation.protein.refseq,
            mutation=self.mutation.short_name,
            summary=self.summary()
        )


class MappedMutationDetails(MutationDetails):

    mutation: 'Mutation'

    @declared_attr
    def mutation_id(cls):
        return db.Column(db.Integer, db.ForeignKey('mutation.id'), unique=True)


MutationSource = Type[MutationDetails]


class ManagedMutationDetails(MappedMutationDetails):
    """For use when one mutation is related to many mutation details entries."""

    @property
    def details_manager(self):
        """Collection class (deriving from MutationDetailsManager) which will
        aggregate results from all the mutation details (per mutation).
        """
        raise NotImplementedError

    @declared_attr
    def mutation_id(cls):
        return db.Column(db.Integer, db.ForeignKey('mutation.id'))


class UserUploadedMutation(MutationDetails):

    name = 'user'
    display_name = 'User\'s mutations'

    # user mutations should be visible in interface,
    # but should not be included in whole-proteome analyses
    is_confirmed = False

    value_type = 'count'

    def __init__(self, count, query, mutation):
        self.count = count
        self.query = query
        self.mutation = mutation

    def get_value(self, filter=lambda x: x):
        return self.count

    def to_json(self, filter=lambda x: x):
        return {
            'Query': self.query,
            'Value': self.count
        }

    def summary(self, filter=lambda x: x):
        return self.query


class CancerMutation(ManagedMutationDetails):

    samples = db.Column(db.Text(), default='')
    value_type = 'count'
    count = db.Column(db.Integer)

    @declared_attr
    def cancer_id(self):
        return db.Column(db.Integer, db.ForeignKey('cancer.id'))

    @declared_attr
    def cancer(self):
        return db.relationship('Cancer')

    def get_value(self, filter=lambda x: x):
        return self.count

    def to_json(self, filter=None):
        return {
            'Cancer': self.cancer.name,
            'Value': self.count
        }

    def summary(self, filter=lambda x: x):
        return self.cancer_code


class PCAWGMutation(CancerMutation, BioModel):
    """Metadata for cancer mutations from PCAWG project"""
    name = 'PCAWG'
    display_name = 'Cancer (PCAWG)'
    details_manager = create_cancer_meta_manager('PCAWG')
    pcawg_cancer_code = association_proxy('cancer', 'code')
    cancer_code = pcawg_cancer_code


class MC3Mutation(CancerMutation, BioModel):
    """Metadata for cancer mutations from ICGC data portal"""
    name = 'MC3'
    display_name = 'Cancer (TCGA PanCancerAtlas)'
    details_manager = create_cancer_meta_manager('MC3')
    mc3_cancer_code = association_proxy('cancer', 'code')
    cancer_code = mc3_cancer_code


class TCGAMutation(CancerMutation, BioModel):
    name = 'TCGA'
    display_name = 'Cancer (TCGA Pancan12)'
    details_manager = create_cancer_meta_manager('TCGA')
    tcga_cancer_code = association_proxy('cancer', 'code')
    cancer_code = tcga_cancer_code


class InheritedMutation(MappedMutationDetails, BioModel):
    """Metadata for inherited diseased mutations from ClinVar from NCBI

    Roughly corresponds to VCV in 2017+ ClinVar
    Columns description come from source VCF file headers.
    """
    name = 'ClinVar'
    display_name = 'Clinical (ClinVar)'
    value_type = 'count'

    # RS: dbSNP ID (i.e. rs number)
    db_snp_ids = db.Column(ScalarSet(element_type=int), default=set)

    # TODO: use dbSNP to retrieve the VLD & PMC data?

    # VLD: This bit is set if the variant has 2+ minor allele
    # count based on frequency or genotype data
    # is_validated = db.Column(db.Boolean)

    # PMC: Links exist to PubMed Central article
    # is_in_pubmed_central = db.Column(db.Boolean)

    clin_data = db.relationship('ClinicalData', uselist=True)

    # combined_significance = association_proxy('clin_data', 'combined_significance')

    # CLNSIG: Variant Clinical Significance, as reported by ClinVar
    # https://www.ncbi.nlm.nih.gov/clinvar/docs/variation_report/
    # combined on variation level (VCV level)
    combined_significances = db.Column(ScalarSet(separator='|'), default=set)

    sig_code = association_proxy('clin_data', 'sig_code')
    gold_stars = association_proxy('clin_data', 'gold_stars')

    disease_name = association_proxy('clin_data', 'disease_name')
    disease_id = association_proxy('clin_data', 'disease_id')

    # Because there are many possible SNVs for each protein mutations,
    # multiple variation ids are to be collected:
    variation_ids_list = association_proxy('clin_data', 'variation_id')

    @hybrid_property
    def variation_ids(self):
        return set(self.variation_ids_list)

    @property
    def clinical_associations_by_disease_name(self) -> Dict[str, ClinicalData]:
        return {
            association.disease.name: association
            for association in self.clin_data
        }

    def get_value(self, filter=lambda x: x):
        return len(filter(self.clin_data))

    @hybrid_property
    def count(self):
        return self.get_value()

    @count.expression
    def count(cls):
        return (
            select([func.count(ClinicalData.id)])
            .where(ClinicalData.inherited_id == cls.id)
            .label('count')
        )

    def to_json(self, filter=lambda x: x):
        return {
            'dbSNP id': list(self.db_snp_ids) or [],
            # 'Is validated': bool(self.is_validated),
            # 'Is in PubMed Central': bool(self.is_in_pubmed_central),
            'Clinical': [
                d.to_json()
                for d in filter(self.clin_data)
            ]
        }

    def summary(self, filter=lambda x: x):
        return list(set(
            d.disease_name
            for d in filter(self.clin_data)
        ))

    @classmethod
    def significance_set_filter(cls, mode: str):
        return cls._significance_filter(ClinicalData.significance_subsets[mode])

    @classmethod
    def significance_filter(cls, significance: str):
        return cls._significance_filter([significance])

    @classmethod
    def _significance_filter(cls, significances: List[str]):
        """If used on mutation, requires: .join(InheritedMutation).join(ClinicalData)"""
        significance_to_code = {
            significance: code
            for code, significance in ClinicalData.significance_codes.items()
        }
        return or_(*[
            InheritedMutation.sig_code.contains(significance_to_code[sig])
            for sig in significances
        ])


def population_manager(_name, _display_name):

    class PopulationManager(MutationDetailsManager):
        """Some population mutations annotations are provided in form of a several
        separate records, where all the records relate to the same aminoacid mutation.

        Assuming that the frequencies are summable, one could merge the annotations
        at the time of import. However, the information about the distribution of
        nucleotide mutation frequencies would be lost if merging at import time.

        Aggregation of the mutations details in PopulationManager allows to have both
        overall and nucleotide-mutation specific frequencies included."""
        name = _name
        display_name = _display_name
        value_type = 'frequency'

        affected_populations = property(generic_aggregator('affected_populations', flatten=True))

        def to_json(self, filter=lambda x: x):
            data = filter(self.data)
            json = data[0].to_json()
            for datum in data[1:]:
                for k, v in datum.to_json().items():
                    json[k] = str(json[k]) + ', ' + str(v)
            return json

    return PopulationManager


class PopulationsComparator(Comparator):
    """Given a population name, or list of population names,
    determine if PopulationMutation include this population
    (i.e. has non-zero frequency of occurrence in the population)
    """

    def __init__(self, cls):
        self.populations_fields = cls.populations_fields()
        self.cls = cls

    def __eq__(self, population_name):
        return getattr(self.cls, self.populations_fields[population_name]) > 0

    def in_(self, population_names):

        return or_(
            *[
                getattr(self.cls, self.populations_fields[population_name]) > 0
                for population_name in population_names
            ]
        )


class PopulationMutation(ManagedMutationDetails):
    """Metadata common for mutations from all population-wide studies

    MAF:
        All - total value
    """
    populations = {
        # place mappings here: field name -> population name
    }

    @classmethod
    def populations_fields(cls):
        return {v: k for k, v in cls.populations.items()}

    maf_all = db.Column(db.Float)
    scale = 1

    def get_value(self, filter=lambda x: x):
        """Total MAF in percents"""
        return self.maf_all

    def summary(self, filter=lambda x: x):
        return self.get_value()

    @hybrid_property
    def affected_populations(self):
        return [
            population_name
            for field, population_name in self.populations.items()
            if getattr(self, field)
        ]

    @affected_populations.comparator
    def affected_populations(cls):
        return PopulationsComparator(cls)


class ExomeSequencingMutation(PopulationMutation, BioModel):
    """Metadata for ESP 6500 mutation

    MAF:
        EA - European American
        AA - African American
    """

    name = 'ESP6500'
    display_name = 'Population (ESP 6500)'
    details_manager = population_manager(name, display_name)

    value_type = 'frequency'

    populations = OrderedDict(
        (
            ('maf_ea', 'European American'),
            ('maf_aa', 'African American')
        )
    )

    maf_ea = db.Column(db.Float)
    maf_aa = db.Column(db.Float)

    populations_ESP6500 = synonym('affected_populations')

    def to_json(self, filter=lambda x: x):
        return {
            'MAF': self.maf_all,
            'MAF EA': self.maf_ea,
            'MAF AA': self.maf_aa,
        }


class The1000GenomesMutation(PopulationMutation, BioModel):
    """Metadata for 1 KG mutation"""
    name = '1KGenomes'
    display_name = 'Population (1000 Genomes)'
    details_manager = population_manager(name, display_name)

    value_type = 'frequency'

    maf_eas = db.Column(db.Float)
    maf_amr = db.Column(db.Float)
    maf_afr = db.Column(db.Float)
    maf_eur = db.Column(db.Float)
    maf_sas = db.Column(db.Float)

    scale = 100

    # note: those are defined as super populations by 1000 Genomes project
    populations = OrderedDict(
        (
            ('maf_eas', 'East Asian'),
            ('maf_amr', 'Ad Mixed American'),
            ('maf_afr', 'African'),
            ('maf_eur', 'European'),
            ('maf_sas', 'South Asian')
        )
    )

    def to_json(self, filter=lambda x: x):
        json = {
            'MAF': self.maf_all,
            'MAF EAS': self.maf_eas,
            'MAF AMR': self.maf_amr,
            'MAF AFR': self.maf_afr,
            'MAF EUR': self.maf_eur,
            'MAF SAS': self.maf_sas,
        }
        return {
            name: (maf * self.scale if maf else None)
            for name, maf in json.items()
        }

    def get_value(self, filter=lambda x: x):
        return self.maf_all * self.scale

    populations_1KG = synonym('affected_populations')


class MIMPMutation(ManagedMutationDetails, BioModel):
    """Metadata for MIMP mutation"""

    name = 'MIMP'
    display_name = 'MIMP'
    is_confirmed = False
    is_visible = False

    details_manager = MIMPMetaManager

    site_id = db.Column(db.Integer, db.ForeignKey('site.id'))
    site = db.relationship('Site')

    probability = db.Column(db.Float)

    pwm = db.Column(db.Text)
    pwm_family = db.Column(db.Text)

    kinase = db.relationship(
        'Kinase',
        primaryjoin='foreign(Kinase.name)==MIMPMutation.pwm',
        uselist=False
    )
    kinase_group = db.relationship(
        'KinaseGroup',
        primaryjoin='foreign(KinaseGroup.name)==MIMPMutation.pwm_family',
        uselist=False
    )

    # gain = +1, loss = -1
    effect = db.Column(db.Boolean)

    effect_map = {
        'gain': 1,
        'loss': 0
    }

    def __init__(self, **kwargs):
        effect = kwargs.get('effect', None)
        if effect in self.effect_map:
            kwargs['effect'] = self.effect_map[effect]
        super().__init__(**kwargs)

    @property
    def is_gain(self):
        return self.effect

    @property
    def is_loss(self):
        return not self.effect

    # position of a mutation in an associated motif
    position_in_motif = db.Column(db.Integer)

    def to_json(self, filter=lambda x: x):
        return {
            'effect': 'gain' if self.is_gain else 'loss',
            'pwm': self.pwm,
            'kinase': {'refseq': self.kinase.protein.refseq} if self.kinase and self.kinase.protein else {},
            'pos_in_motif': self.position_in_motif,
            'pwm_family': self.pwm_family,
            'site': self.site.to_json() if self.site else None,
            'probability': self.probability
        }


def details_proxy(target_class, key, managed=False):
    """Creates an AssociationProxy targeting MutationDetails descendants.
    The proxy enables easy filtering and retrieval of the otherwise deeply
    hidden information and should be owned by (i.e. declared in) Mutation model.

    Attributes:
        managed: if True, a request for server side filtering system
            (as opposite to database side filtering) to use attribute
            of name 'key' from 'MutationDetailsManager' (instead of
            the one from the MutationDetails itself) will be encoded
            in the proxy function.

    """

    field_name = 'meta_' + target_class.name
    proxy = association_proxy(field_name, key)

    if managed:
        def proxy_to_details_manager(object):
            if type(object) is Mutation:
                manager = getattr(object, field_name)
            elif isinstance(object, ManagedMutationDetails):
                manager = object.details_manager
            else:
                assert isinstance(object, MutationDetailsManager)
                manager = object
            return getattr(manager, key)
        proxy.custom_attr_getter = proxy_to_details_manager
    else:
        proxy.custom_attr_getter = None

    return proxy


def are_details_managed(model):
    return ManagedMutationDetails in model.mro()


def mutation_details_relationship(model, use_list=False, **kwargs):
    return db.relationship(
        model,
        backref='mutation',
        uselist=use_list,
        **kwargs
    )


def managed_mutation_details_relationship(model):
    return mutation_details_relationship(
        model,
        use_list=True,
        collection_class=model.details_manager
    )


class Sources:

    def __init__(self, all_sources):
        self.all = all_sources

        self.relationships = {
            'meta_' + source.name: (
                managed_mutation_details_relationship(source)
                if are_details_managed(source) else
                mutation_details_relationship(source)
            )
            for source in all_sources
            if issubclass(source, MappedMutationDetails)
        }

        self.visible: List[MutationSource] = [
            source for source in all_sources if source.is_visible
        ]
        self.confirmed: List[MutationSource] = [
            source for source in all_sources if source.is_confirmed
        ]

        self.fields: Mapping[MutationSource, str] = {
            source: 'meta_' + source.name
            for source in all_sources
        }

        self.visible_fields: Mapping[str, str] = {
            source.name: 'meta_' + source.name
            for source in self.visible
        }

    def get_relationship(self, source: MutationSource) -> RelationshipProperty:
        return self.class_relation_map[source]

    def get_bound_relationship(self, mutation, source: MutationSource) -> MutationDetails:
        try:
            return getattr(mutation, self.fields[source])
        except AttributeError:
            return None

    @property
    @lru_cache()
    def class_relation_map(self) -> Dict[MutationSource, RelationshipProperty]:
        return {
            model: getattr(Mutation, self.fields[model])
            for model in self.all
            if issubclass(model, MappedMutationDetails)
        }

    @property
    @lru_cache()
    def source_by_name(self) -> Dict[str, MutationSource]:
        return {
            model.name: model
            for model in self.all
        }


source_manager = Sources([
    # order matters (widget's labels will show up in this order)
    # TCGAMutation,
    MC3Mutation,
    PCAWGMutation,
    InheritedMutation,
    ExomeSequencingMutation,
    The1000GenomesMutation,
    UserUploadedMutation,
    MIMPMutation
])


class MutatedMotifs:

    were_affected_motifs_precomputed = db.Column(db.Boolean, default=False)

    @declared_attr
    def precomputed_affected_motifs(self):
        mutation_motif_table = make_association_table(f'{self.__tablename__}.id', SiteMotif.id)
        return db.relationship(
            SiteMotif,
            secondary=mutation_motif_table,
            collection_class=set,
            backref='mutations'
        )

    def affected_motifs(self, sites: Iterable[Site] = None):
        if self.were_affected_motifs_precomputed:
            return self.precomputed_affected_motifs

        from analyses.motifs import mutate_sequence
        from analyses.motifs import has_motif

        affected_motifs = []

        if not sites:
            sites = self.affected_sites

        for site in sites:
            for site_type in site.types:

                for motif in site_type.motifs:

                    if site.has_motif(motif.pattern):
                        # todo: make it a method of mutation? "self.mutate_sequence()" ?

                        mutated_sequence = mutate_sequence(site, self, offset=7)
                        if not has_motif(mutated_sequence, motif.pattern):
                            affected_motifs.append((motif, self.position - site.position + 7))

        return affected_motifs


class Mutation(BioModel, MutatedMotifs):
    __table_args__ = (
        db.Index('mutation_index', 'alt', 'protein_id', 'position'),
        db.UniqueConstraint('alt', 'protein_id', 'position')
    )

    # 1-based
    position = db.Column(db.Integer)
    alt = db.Column(db.String(1))
    protein_id = db.Column(db.Integer, db.ForeignKey('protein.id'))
    protein: 'Protein'

    # To speed up results retrieval one can precompute value of
    # 'is_ptm' property. It does not carry meaningful information
    # for novel mutations until correctly precomputed (e.g. with
    # instruction: m.precomputed_is_ptm = m.is_ptm).
    # You can distinguish if it was precomputed: check if the value
    # is different than None. Be careful with boolean evaluation!
    precomputed_is_ptm = db.Column(db.Boolean)

    types = ('direct', 'network-rewiring', 'motif-changing', 'proximal', 'distal', 'none')

    vars().update(source_manager.relationships)

    populations_1KG = details_proxy(The1000GenomesMutation, 'affected_populations', managed=True)
    populations_ESP6500 = details_proxy(ExomeSequencingMutation, 'affected_populations', managed=True)
    mc3_cancer_code = details_proxy(MC3Mutation, 'mc3_cancer_code')
    pcawg_cancer_code = details_proxy(PCAWGMutation, 'pcawg_cancer_code')
    sig_code = details_proxy(InheritedMutation, 'sig_code')
    gold_stars = details_proxy(InheritedMutation, 'gold_stars')
    disease_id = details_proxy(InheritedMutation, 'disease_id')
    disease_name = details_proxy(InheritedMutation, 'disease_name')

    def __repr__(self):
        return '<Mutation in {0}, at {1} aa, substitution to: {2}>'.format(
            self.protein.refseq if self.protein else '-',
            self.position,
            self.alt
        )

    @hybrid_property
    def sources_map(self) -> Mapping[str, MutationDetails]:
        """Return mapping: name -> bound relationship for >confirmed< sources that mention this mutation"""
        mapping = {}
        for source in source_manager.confirmed:
            details = source_manager.get_bound_relationship(self, source)
            if details:
                mapping[source.name] = details
        return mapping

    @hybrid_property
    def sources(self) -> List[str]:
        """Return list of names of >visible< sources that mention this mutation"""
        return [
            source.name
            for source in source_manager.visible
            if source_manager.get_bound_relationship(self, source)
        ]

    @hybrid_property
    def is_confirmed(self):
        """Mutation is confirmed if there are metadata from one of four studies

        (or experiments). Presence of MIMP metadata does not imply
        if mutation has been ever studied experimentally before.
        """
        return any([source_manager.get_bound_relationship(self, source) for source in source_manager.confirmed])

    @is_confirmed.expression
    def is_confirmed(cls):
        """SQL expression for is_confirmed"""
        return or_(
            relationship_field.any()
            if relationship_field.prop.uselist
            else relationship_field.has()
            for relationship_field in [
                source_manager.get_relationship(source) for source in source_manager.confirmed
            ]
        )

    @hybrid_property
    def sites(self):
        return self.get_affected_ptm_sites()

    @sites.expression
    def sites(self):
        return self.affected_sites

    affected_sites = db.relationship(
        'Site',
        primaryjoin=(
            'and_('
            '   foreign(Site.protein_id) == Mutation.protein_id,'
            '   Site.position.between(Mutation.position - 7, Mutation.position + 7)'
            ')'
        ),
        order_by='Site.position'
    )

    @hybrid_method
    def is_ptm(self, filter_manager=None):
        """Mutation is PTM related if it may affect PTM site.

        Mutations flanking PTM site in a distance up to 7 residues
        from a site (called here 'distal') will be included too.

        This method works very similarly to is_ptm_distal property.
        """
        sites = self.protein.sites
        if filter_manager:
            sites = filter_manager.apply(sites)
        return self.is_close_to_some_site(7, 7, sites)

    @hybrid_property
    def ref(self):
        sequence = self.protein.sequence
        return sequence[self.position - 1]

    @hybrid_property
    def is_ptm_direct(self):
        """True if the mutation is on the same position as some PTM site."""
        return self.is_close_to_some_site(0, 0)

    @hybrid_property
    def is_ptm_proximal(self):
        """Check if the mutation is in close proximity of some PTM site.

        Proximity is defined here as [pos - 2, pos + 2] span,
        where pos is the position of a PTM site.
        """
        return self.is_close_to_some_site(2, 2)

    @hybrid_property
    def is_ptm_distal(self):
        """Check if the mutation is distal flanking mutation of some PTM site.

        Distal flank is defined here as [pos - 7, pos + 7] span,
        where pos is the position of a PTM site.
        """
        # if we have precomputed True or False (i.e. it's known
        # mutations - so we precomputed this) then return this
        if self.precomputed_is_ptm is not None:
            return self.precomputed_is_ptm
        # otherwise it's a novel mutation - let's check proximity
        return self.is_close_to_some_site(7, 7)

    def get_affected_ptm_sites(self, site_filter=lambda x: x):
        """Get PTM sites that might be affected by this mutation,

        when taking into account -7 to +7 spans of each PTM site.
        """
        sites = site_filter(self.protein.sites)
        pos = self.position
        a = 0
        b = len(sites)
        sites_affected = set()

        while a != b:
            pivot = (b - a) // 2 + a
            site_pos = sites[pivot].position
            if site_pos - 7 <= pos <= site_pos + 7:
                hit = pivot
                sites_affected.add(sites[pivot])
                break
            if pos > site_pos:
                a = pivot + 1
            else:
                b = pivot
        else:
            return tuple()

        def cond():
            try:
                site_pos = sites[pivot].position
                return site_pos - 7 <= pos <= site_pos + 7
            except IndexError:
                return tuple()

        # go to right from found site, check if there is more overlapping sites
        pivot = hit + 1
        while cond():
            sites_affected.add(sites[pivot])
            pivot += 1

        # and then go to the left
        pivot = hit - 1
        while cond():
            sites_affected.add(sites[pivot])
            pivot -= 1

        return sorted(sites_affected, key=lambda site: site.position)

    def impact_on_specific_ptm(self, site: Site, ignore_mimp=False):
        if self.position == site.position:
            return 'direct'
        elif site in self.meta_MIMP.sites and not ignore_mimp:
            return 'network-rewiring'
        elif self.affected_motifs([site]):
            return 'motif-changing'
        elif abs(self.position - site.position) < 3:
            return 'proximal'
        elif abs(self.position - site.position) < 8:
            return 'distal'
        else:
            return 'none'

    def impact_on_ptm(self, site_filter=None):
        """How intense might be an impact of the mutation on a PTM site.

        It describes impact on the closest PTM site or on a site chosen by
        MIMP algorithm (so it applies only when 'network-rewiring' is returned)
        """
        if site_filter is None:
            sites = self.sites
        else:
            sites = site_filter(self.sites)

        if self.is_close_to_some_site(0, 0, sites):
            return 'direct'
        elif any(site in sites for site in self.meta_MIMP.sites):
            return 'network-rewiring'
        elif self.affected_motifs(sites):
            return 'motif-changing'
        elif self.is_close_to_some_site(2, 2, sites):
            return 'proximal'
        elif self.is_close_to_some_site(7, 7, sites):
            return 'distal'
        return 'none'

    def find_closest_sites(self, distance=7, site_filter=lambda x: x):
        # TODO: implement site type filter
        pos = self.position

        sites = Site.query.filter(
            and_(
                Site.protein_id == self.protein_id,
                Site.position.between(pos - distance, pos + distance)
            )
        ).order_by(func.abs(Site.position - pos)).limit(2).all()

        if(
            len(sites) == 2 and
            abs(sites[0].position - pos) != abs(sites[1].position - pos)
        ):
            return sites[:1]
        else:
            return sites

    @hybrid_method
    def is_close_to_some_site(self, left, right, sites=None):
        """Check if the mutation lies close to any of sites.

        Arguments define span around each site to be checked:
        (site_pos - left, site_pos + right)
        site_pos is the position of a site

        Algorithm is based on bisection and an assumption,
        that sites are sorted by position in the database.
        """
        if sites is None:
            sites = self.protein.sites
        pos = self.position
        a = 0
        b = len(sites)
        while a != b:
            p = (b - a) // 2 + a
            site_pos = sites[p].position
            if site_pos - left <= pos <= site_pos + right:
                return True
            if pos > site_pos:
                a = p + 1
            else:
                b = p
        return False

    @is_close_to_some_site.expression
    def is_close_to_some_site(self, left, right):
        """SQL expression for is_close_to_some_site"""
        position = self.position
        q = exists().where(
            and_(
                Site.protein_id == self.protein_id,
                Site.position.between(position - left, position + right)
            )
        )
        return db.session.query(q).scalar()

    @property
    def short_name(self):
        return f'{self.ref}{self.position}{self.alt}'

    @property
    def name(self):
        return f'{self.protein.gene.name} {self.short_name}'

    def to_json(self):
        return {
            'name': self.name,
            'is_confirmed': self.is_confirmed,
            'is_ptm': self.is_ptm()
        }

    @classmethod
    def in_sources(cls, *sources: MutationSource, conjunction=and_):

        return conjunction(
            (
                (
                    source_manager.get_relationship(source).any()
                    if are_details_managed(source) else
                    source_manager.get_relationship(source).has()
                )
                for source in sources
            )
        )


def confirmed_mutation_sources():
    return {
        source.name: source
        for source in source_manager.confirmed
    }


def ensure_mutations_are_precomputed(context: str):
    for source in confirmed_mutation_sources().values():
        mutations_missing_precomputed_status = (
            Mutation
            .query
            .filter(Mutation.in_sources(source))
            .filter(Mutation.precomputed_is_ptm == None)
            .count()
        )
        if mutations_missing_precomputed_status != 0:
            raise ValueError(
                f'{context} requires mutation PTM status to be precomputed for all mutations.'
                f'\n{mutations_missing_precomputed_status} {source} mutations are missing precomputed status.'
                f'\nRun `./manage.py load protein_related precompute_ptm_mutations` first'
            )