KarrLab/datanator_query_python

View on GitHub
datanator_query_python/query/query_taxon_tree.py

Summary

Maintainability
F
4 days
Test Coverage
A
91%
from datanator_query_python.util import mongo_util, chem_util, file_util
from datanator_query_python.aggregate import pipelines
import os
import json
from pymongo.collation import Collation, CollationStrength
import pymongo


class QueryTaxonTree(mongo_util.MongoUtil):
    '''Queries specific to taxon_tree collection
    '''

    def __init__(self, cache_dirname=None, collection_str='taxon_tree', 
                verbose=False, max_entries=float('inf'), username=None, MongoDB=None, 
                password=None, db='datanator-test', authSource='admin', readPreference='nearest',
                replicaSet=None):
        self.collection_str = collection_str
        super().__init__(cache_dirname=cache_dirname, MongoDB=MongoDB,
                        db=db, verbose=verbose, max_entries=max_entries, username=username,
                        password=password, authSource=authSource, readPreference=readPreference,
                        replicaSet=replicaSet)
        self.pipeline_manager = pipelines.Pipeline()
        self.chem_manager = chem_util.ChemUtil()
        self.file_manager = file_util.FileUtil()
        self.collection = self.db_obj[self.collection_str]
        self.collation = Collation(locale='en', strength=CollationStrength.SECONDARY)

    def get_all_species(self):
        ''' Get all organisms in taxon_tree collection
            Return:
                result (:obj:`list` of  :obj:`str`): list of organisms
        '''
        projection = {'tax_name':1}
        mass = self.collection.find({ 'tax_name': {'$exists': True} }, projection=projection)
        
        for thing in mass:
            yield json.dumps({'tax_name': thing['tax_name']})

    def get_ids_by_name(self, name):
        '''
            Get all taxon ids associated with an
            organism name
            Args:
                name (:obj:`str`): species name
            Returns:
                ids (:obj:`list` of :obj:`int`): list of 
                taxon ids
        '''
        ids = []
        projection = {'tax_id':1, '_id': 0}
        expression = "\"" + name + "\""
        query = {'$text': {'$search': expression}}
        docs = self.collection.find(filter=query, projection=projection)
        for doc in docs:
            ids.append(doc['tax_id'])
        return ids

    def get_name_by_id(self, ids):
        ''' Get organisms' names given their tax_ids
            Args:
                ids (:obj:`list`): list of organisms' tax_ids
            Return:
                (:obj:`dict`): organisms' ids and names
        '''
        names = {}
        projection = {'_id': 0, 'tax_name': 1, 'tax_id': 1}
        query = {'tax_id': {'$in': ids}}
        docs = self.collection.find(filter=query, projection=projection)
        if docs is None:
            return {-1: 'none in database'}
        for doc in docs:
            _id = doc['tax_id']
            names[_id] = doc['tax_name']
        return names

    def get_anc_by_name(self, names):
        ''' Get organism's ancestor ids by
            using organism's names
            Args:
                names: list of organism's names e.g. Candidatus Diapherotrites
            Return:
                result_id: list of ancestors ids in order of the farthest to the closest
                result_name: list of ancestors' names in order of the farthest to the closest
        '''
        result_id = []
        result_name = []
        projection = {'_id': 0, 'anc_id': 1, 'anc_name': 1}
        for name in names:
            query = {'tax_name': name}
            doc = self.collection.find_one(filter=query, collation=self.collation,
                                        projection=projection)
            if doc is not None:
                result_id.append(doc['anc_id'])
                result_name.append(doc['anc_name'])
            else:
                result_id.append([])
                result_name.append([])
        return result_id, result_name

    def get_anc_by_id(self, ids):
        ''' Get organism's ancestor ids by
            using organism's ids
            Args:
                ids: list of organism's ids e.g.[12345, 234456]

            Return:
                (:obj:`tuple` of :obj:`list`): list of ancestors in order of the farthest to the closest
        '''
        result_name = []
        result_id = []
        projection = {'_id': 0, 'anc_id': 1, 'anc_name': 1}
        query = {'tax_id': {'$in': ids}}
        for _id in ids:
            query = {'tax_id': _id}
            cursor = self.collection.find_one(query,
                                              projection=projection)
            if cursor is None:
                result_id.append([-1])
                result_name.append(['not in database'])
            else:                
                result_id.append(cursor['anc_id'])
                result_name.append(cursor['anc_name'])
        return result_id, result_name

    def get_common_ancestor(self, org1, org2, org_format='name'):
        ''' Get the closest common ancestor between
            two organisms and their distances to the 
            said ancestor
            Args:
                org1: organism 1
                org2: organism 2
                org_format: the format of organism eg tax_id or tax_name
            Return:
                ancestor: closest common ancestor's name
                distance: each organism's distance to the ancestor
        '''
        if org1 is None or org2 is None:
            return ('Enter organism information', [0, 0])

        if org_format == 'name':
            if org1.upper() == org2.upper():
                return (org1, [0, 0])
            else:
                anc_ids, _ = self.get_anc_by_name([org1, org2])
        else:
            anc_ids, _ = self.get_anc_by_id([org1, org2])

        org1_anc = anc_ids[0]
        org2_anc = anc_ids[1]

        ancestor = self.file_manager.get_common(org1_anc, org2_anc)
        if ancestor == '':
            return ('No common ancestor', [-1, -1])
        idx_org1 = org1_anc.index(ancestor)
        idx_org2 = org2_anc.index(ancestor)

        distance1 = len(org1_anc) - (idx_org1)
        distance2 = len(org2_anc) - (idx_org2)

        return (ancestor, [distance1, distance2])

    def get_rank(self, ids):
        ''' Given a list of taxon ids, return
            the list of ranks. no rank = '+'
            Args:
                ids: list of taxon ids [1234,2453,431]
            Return:
                ranks: list of ranks ['kingdom', '+', 'phylum']
        '''
        ranks = []
        roi = ['species', 'genus', 'family', 'order', 'class', 'phylum', 'kingdom', 'superkingdom']
        projection = {'rank': 1, 'tax_id': 1}
        for _id in ids:
            query = {'tax_id': _id}
            cursor = self.collection.find_one(filter=query, projection=projection)
            rank = cursor.get('rank', None)
            tax_id = cursor.get('tax_id', None)
            if rank in roi:
                ranks.append(rank)
            elif tax_id == 131567:
                ranks.append('cellular organisms')
            else:
                ranks.append('+')

        return ranks

    def get_equivalent_species(self, _id, max_distance, max_depth=float('inf')):
        '''
            Get equivalent species of species with tax_id _id,
            given the max taxonomic distances, for instance, given three species
            {'tax_id': 8, 'anc_id': [5,4,3,2,6,7]}
            {'tax_id': 9, 'anc_id': [5,4,3]}
            {'tax_id': 0, 'anc_id': [5,4,3,2,1]}
            the equivalent species of 0 given max_distance of 2, is 8
            the equivalent species of 0 given max_distance of 3, is 8 and 9
            Args:
                _id (:obj:`int`): taxonomy id of the species
                max_distance (:obj:`int`): max distance allowed from species _id
                max_depth (:obj:`int`) max depth allowed from the common node
            Returs:
                ids (:obj:`list` of :obj:`int`): list of ids of the species that met the condition
                names (:obj:`list` of :obj:`str`) list of names of the species that met the condition
        '''
        if max_distance < 1 or max_depth < 1:
            return 'Either input has to be greater than 0'
        ids = []
        names = []
        checked_ids = [_id]
        projection = {'anc_id': 1, 'anc_name': 1, 'tax_id': 1, 'tax_name': 1}
        ancestor_ids = self.collection.find_one(filter={'tax_id': _id})['anc_id']
        levels = min(len(ancestor_ids), max_distance)

        for level in range(levels):
            cur_id = ancestor_ids[-(level+1)]       
            if level == 0:
                common_ancestors = ancestor_ids
            else:                
                common_ancestors = ancestor_ids[:-(level)]
            length = len(common_ancestors)

            query = {'$and': [{'anc_id': {'$all': common_ancestors} },{'tax_id': {'$nin': checked_ids} },
                              {'anc_id': {'$nin': checked_ids} }]}

            equivalents = self.collection.find(filter=query, projection=projection)
            for equivalent in equivalents:
                if 0 <= len(equivalent['anc_id']) - length < max_depth:
                    ids.append(equivalent['tax_id'])
                    names.append(equivalent['tax_name'])

            checked_ids.append(cur_id)

        return ids, names

    def get_canon_rank_distance(self, _id, front_end=False):
        '''Given the ncbi_id, return canonically-ranked ancestors
            along the lineage and their non-canonical distances

        Args:
            _id (:obj:`int`): ncbi_id of the organism.
            front_end (:obj:`bool`): meets front_end request

        Return:
            (:obj:`list` of :obj:`dict`): canonical organisms and distances
            e.g. [{'a':1}, {'b': 3}, ...]
        '''
        roi = ['species', 'genus', 'family', 'order', 'class', 'phylum', 'kingdom', 'superkingdom']
        projection = {'rank': 1, '_id': 0, 'tax_name': 1}
        query = {'tax_id': _id}
        anc = self.collection.find_one(filter=query, projection={'anc_id':1, 'tax_name': 1, 'anc_name': 1})
        result = []
        if front_end:
            result.append({anc['tax_name']: 0})
        for i, tax_id in enumerate(reversed(anc['anc_id'])):
            query = {'tax_id': tax_id}
            doc = self.collection.find_one(filter=query, projection=projection)
            rank = doc.get('rank', None)
            if rank in roi:
                result.append({doc['tax_name']: i + 1})
            else:
                continue
        if front_end:
            result.append({anc['anc_name'][0]: len(anc['anc_name'])})
        return result

    def get_canon_rank_distance_by_name(self, name, front_end=False):
        '''Given the name of species, return canonically-ranked ancestors
            along the lineage and their non-canonical distances

        Args:
            name (:obj:`str`): name of the organism.
            front_end (:obj:`bool`): meets front_end request

        Return:
            (:obj:`list` of :obj:`dict`): canonical organisms and distances
            e.g. [{'a':1}, {'b': 3}, ...]
        '''
        roi = ['species', 'genus', 'family', 'order', 'class', 'phylum', 'kingdom', 'superkingdom']
        projection = {'rank': 1, '_id': 0, 'tax_name': 1}
        query = {'tax_name': name}
        anc = self.collection.find_one(filter=query, projection={'anc_id':1, 'tax_name': 1, 'anc_name': 1},
                                        collation=self.collation)
        result = []
        if anc is None:
            return [{name: 0}]
        if front_end:
            result.append({anc['tax_name']: 0})
        for i, tax_id in enumerate(reversed(anc['anc_id'])):
            query = {'tax_id': tax_id}
            doc = self.collection.find_one(filter=query, projection=projection)
            rank = doc.get('rank', None)
            if rank in roi:
                result.append({doc['tax_name']: i + 1,
                                    'rank': rank})
            else:
                continue
        if front_end:
            result.append({anc['anc_name'][0]: len(anc['anc_name'])})
        return result

    def under_category(self, src_tax_id, target_tax_id):
        """Given source taxonomy id, check if it is among the
        children of target tax id.
        
        Args:
            src_tax_id (:obj:`int`): source oragnism taxonomic ID.
            target_tax_id (:obj:`int`): target organism taxonomic ID.

        Return:
            (:obj:`bool`): whether source is under target organism.
        """
        query = {'tax_id': src_tax_id}
        projection = {'anc_id': 1}
        doc = self.collection.find_one(filter=query, projection=projection)
        if doc is not None:
            return target_tax_id in doc['anc_id']
        else:
            return False

    def each_under_category(self, src_tax_ids, target_tax_id):
        """Given a list of source organism IDs, check if each ID
        is the child of target organism.
        
        Args:
            src_tax_ids (:obj:`list` of :obj:`int`): List of NCBI Taxonomy IDs.
            target_tax_id (:obj:`int`): Target organism ID.

        Return:
            (:obj:`list` of :obj:`bool`): Boolean indicating if source is the child or target.
        """
        projection = {'anc_id': 1, '_id': 0}
        result = []
        # projection['__order'] = 0
        pipeline = [
             {'$match': {'tax_id': {'$in': src_tax_ids}}},
             {'$addFields': {"__order": {'$indexOfArray': [src_tax_ids, "$tax_id"]}}},
             {'$sort': {"__order": 1}},
             {"$project": projection}
            ]
        docs = self.collection.aggregate(pipeline)
        if docs is None:
            return [False]
        for doc in docs:
            if doc is None:
                result.append(False)
            elif target_tax_id in doc['anc_id']:
                result.append(True)
            else:
                result.append(False)
        return result

    def get_canon_common_ancestor(self, org1, org2, org_format='tax_id'):
        ''' Get the closest common ancestor between
            two organisms and their distances to the 
            said ancestor
            Args:
                org1: organism 1
                org2: organism 2
                org_format: the format of organism eg tax_id or tax_name

            Return:
                (:obj:`Obj`)
        '''
        if org1 is None or org2 is None:
            return {'reason': 'Needs two organisms.'}

        # if org1 == org2:
        #     return (org1, [0, 0])

        if org_format == 'tax_id':
            anc_ids, anc_names = self.get_anc_by_id([org1, org2])
            org1_anc = anc_ids[0]
            org1_anc_name = anc_names[0]
            org2_anc = anc_ids[1]
            org2_anc_name = anc_names[1]
        elif org_format == 'tax_name':
            anc_ids, anc_names = self.get_anc_by_name([org1, org2])
            org1_anc = anc_ids[0]
            org1_anc_name = anc_names[0]
            org2_anc = anc_ids[1]
            org2_anc_name = anc_names[1]
        
        if org1_anc == [-1]:
            return {str(org1): -1, str(org2): -1, 'reason': 'No such organism found: {}'.format(org1)}
        elif org2_anc == [-1]:
            return {str(org1): -1, str(org2): -1, 'reason': 'No such organism found: {}'.format(org2)}

        rank1_anc = self.get_rank(org1_anc)
        rank2_anc = self.get_rank(org2_anc)
        canon_anc_1 = []
        canon_anc_2 = []
        [canon_anc_1.append(anc) for (anc, rank) in zip(org1_anc_name, rank1_anc) if rank != '+']
        [canon_anc_2.append(anc) for (anc, rank) in zip(org2_anc_name, rank2_anc) if rank != '+']

        ancestor = self.file_manager.get_common(canon_anc_1, canon_anc_2)
        if ancestor == '':                
            return {str(org1): -1, str(org2): -1, 'reason': 'No common ancestor'}
        idx_org1 = canon_anc_1.index(ancestor)
        idx_org2 = canon_anc_2.index(ancestor)

        distance1 = len(canon_anc_1) - (idx_org1)
        distance2 = len(canon_anc_2) - (idx_org2)

        return {str(org1): distance1, str(org2): distance2, str(org1)+'_canon_ancestors':canon_anc_1,
        str(org2)+'_canon_ancestors':canon_anc_2}

    def get_canon_common_ancestor_fast(self, org1, org2, org_format='tax_id'):
        ''' Get the closest common ancestor between
            two organisms and their distances to the 
            said ancestor
            Args:
                org1: organism 1
                org2: organism 2
                org_format: the format of organism eg tax_id or tax_name

            Return:
                (:obj:`Obj`)
        '''
        if org1 is None or org2 is None:
            return {'reason': 'Needs two organisms.'}
        collection = self.client["datanator-test"]["taxon_tree"]
        doc_1 = collection.find_one({org_format: org1}, projection={'canon_anc_ids': 1, 'canon_anc_names': 1})
        doc_2 = collection.find_one({org_format: org2}, projection={'canon_anc_ids': 1, 'canon_anc_names': 1})
        if doc_1 is None or doc_2 is None:
            return {str(org1): -1, str(org2): -1, 'reason': 'No organism found.'}

        canon_anc_1 = doc_1["canon_anc_names"]
        canon_anc_2 = doc_2["canon_anc_names"]
        
        if canon_anc_1 == canon_anc_2:
            return {str(org1): 0, str(org1)+'_canon_ancestors':canon_anc_1}

        if canon_anc_1[-1] == org2:
            distance1 = 1
            distance2 = 0
        elif canon_anc_2[-1] == org1:
            distance1 = 0
            distance2 = 1
        else:
            distance1 = -1
            distance2 = -1            
        ancestor = self.file_manager.get_common(canon_anc_1, canon_anc_2)
        if ancestor == '':                
            return {str(org1): -1, str(org2): -1, 'reason': 'No common ancestor.'}

        idx_org1 = canon_anc_1.index(ancestor)
        idx_org2 = canon_anc_2.index(ancestor)

        if distance1 == -1:
            distance1 = len(canon_anc_1) - (idx_org1) 
        if distance2 == -1:
            distance2 = len(canon_anc_2) - (idx_org2)

        return {str(org1): distance1, str(org2): distance2, str(org1)+'_canon_ancestors':canon_anc_1,
        str(org2)+'_canon_ancestors':canon_anc_2}