KarrLab/datanator_query_python

View on GitHub
datanator_query_python/query/query_sabiork_old.py

Summary

Maintainability
F
1 wk
Test Coverage
C
79%
from datanator_query_python.util import mongo_util, chem_util, file_util
from datanator_query_python.aggregate import lookups
from pymongo.collation import Collation, CollationStrength
from . import query_taxon_tree, query_sabio_compound
import json
import re
from pymongo import ASCENDING, DESCENDING
from collections import deque


class QuerySabioOld(mongo_util.MongoUtil):
    '''Queries specific to sabio_rk collection
    '''

    def __init__(self, cache_dirname=None, MongoDB=None, replicaSet=None, db='datanator',
                 collection_str='sabio_rk_old', verbose=False, max_entries=float('inf'), username=None,
                 password=None, authSource='admin', readPreference='nearest'):
        self.max_entries = max_entries
        super().__init__(cache_dirname=cache_dirname, MongoDB=MongoDB,
                        replicaSet=replicaSet, db=db,
                        verbose=verbose, max_entries=max_entries, username=username,
                        password=password, authSource=authSource, readPreference=readPreference)
        self.u = self.client["datanator-test"]["uniprot"]
        self.chem_manager = chem_util.ChemUtil()
        self.file_manager = file_util.FileUtil()
        self.collection = self.db_obj[collection_str]
        self.collection_str = collection_str
        self.taxon_manager = query_taxon_tree.QueryTaxonTree(username=username, password=password,
        authSource=authSource, readPreference=readPreference, MongoDB=MongoDB, replicaSet=replicaSet)
        self.compound_manager = query_sabio_compound.QuerySabioCompound(server=MongoDB, database=db,
                                                                        username=username, password=password, 
                                                                        readPreference=readPreference, authSource=authSource,
                                                                        replicaSet=replicaSet)
        self.collation = Collation(locale='en', strength=CollationStrength.SECONDARY)

    def get_kinlaw_by_environment(self, taxon=None, taxon_wildtype=None, ph_range=None, temp_range=None,
                          name_space=None, param_type=None, projection={'_id': 0}):
        """get kinlaw info based on experimental conditions
        
        Args:
            taxon (:obj:`list`, optional): list of ncbi taxon id
            taxon_wildtype (:obj:`list` of :obj:`bool`, optional): True indicates wildtype and False indicates mutant
            ph_range (:obj:`list`, optional): range of pH
            temp_range (:obj:`list`, optional): range of temperature
            name_space (:obj:`dict`, optional): cross_reference key/value pair, i.e. {'ec-code': '3.4.21.62'}
            param_type (:obj:`list`, optional): possible values for parameters.type
            projection (:obj:`dict`, optional): mongodb query result projection

        Returns:
            (:obj:`tuple`) consisting of 
            docs (:obj:`list` of :obj:`dict`): list of docs;
            count (:obj:`int`): number of documents found 
        """
        all_constraints = []
        taxon_wildtype = [int(x) for x in taxon_wildtype]
        if taxon:
            all_constraints.append({'taxon_id': {'$in': taxon}})
        if taxon_wildtype:
            all_constraints.append({'taxon_wildtype': {'$in': taxon_wildtype}})
        if ph_range:
            all_constraints.append({'ph': {'$gte': ph_range[0], '$lte': ph_range[1]}})
        if temp_range:
            all_constraints.append({'temperature': {'$gte': temp_range[0], '$lte': temp_range[1]}})
        if name_space:
            key = list(name_space.keys())[0]
            val = list(name_space.values())[0]
            all_constraints.append({"resource": {'$elemMatch': {'namespace': key, 'id': val}}})
        if param_type:
            all_constraints.append({'parameter': {'$elemMatch': {'type': {'$in': param_type}}}})

        query = {'$and': all_constraints}
        docs = self.collection.find(filter=query, projection=projection)
        count = self.collection.count_documents(query)
        return docs, count

    def get_reaction_doc(self, kinlaw_id, projection={'_id': 0}):
        '''Find a document on reaction with the kinlaw_id
        Args:
            kinlaw_id (:obj:`list` of :obj:`int`) list of kinlaw_id to search for
            projection (:obj:`dict`): mongodb query result projection

        Returns:
            (:obj:`tuple`) consisting of 
            docs (:obj:`list` of :obj:`dict`): list of docs;
            count (:obj:`int`): number of documents found
        '''
        query = {'kinlaw_id': {'$in': kinlaw_id}}
        docs = self.collection.find(filter=query, projection=projection)
        count = self.collection.count_documents(query)
        return docs, count

    def get_kinlawid_by_rxn(self, substrates, products, dof=0):
        ''' Find the kinlaw_id defined in sabio_rk using 
            rxn participants' inchikey

            Args:
                substrates (:obj:`list`): list of substrates' inchikey
                products (:obj:`list`): list of products' inchikey
                dof (:obj:`int`, optional): degree of freedom allowed (number of parts of
                                  inchikey to truncate); the default is 0 

            Return:
                rxns: list of kinlaw_ids that satisfy the condition
                [id0, id1, id2,...,  ]
        '''
        result = deque()
        substrate = 'reaction_participant.substrate_aggregate'
        product = 'reaction_participant.product_aggregate'
        projection = {'kinlaw_id': 1, '_id': 0}
        if dof == 0:
            substrates = substrates
            products = products
        elif dof == 1:
            substrates = [re.compile('^' + x[:-2]) for x in substrates]
            products = [re.compile('^' + x[:-2]) for x in products]
        else:
            substrates = [re.compile('^' + x[:14]) for x in substrates]
            products = [re.compile('^' + x[:14]) for x in products]

        constraint_0 = {substrate: {'$all': substrates}}
        constraint_1 = {product: {'$all': products}}
        query = {'$and': [constraint_0, constraint_1]}
        docs = self.collection.find(filter=query, projection=projection)
        if docs is not None:
            for doc in docs:
                result.append(doc['kinlaw_id'])
        return result

    def get_kinlaw_by_rxn(self, substrates, products, dof=0,
                          projection={'kinlaw_id': 1, '_id': 0},
                          bound='loose', skip=0, limit=0):
        ''' Find the kinlaw_id defined in sabio_rk using 
            rxn participants' inchikey

            Args:
                substrates (:obj:`list`): list of substrates' inchikey
                products (:obj:`list`): list of products' inchikey
                dof (:obj:`int`, optional): degree of freedom allowed (number of parts of
                                  inchikey to truncate); the default is 0
                projection (:obj:`dict`): pymongo query projection 
                bound (:obj:`str`): limit substrates/products to include only input values

            Return:
                (:obj:`list` of :obj:`dict`): list of kinlaws that satisfy the condition
        '''
        substrate = 'reaction_participant.substrate_aggregate'
        product = 'reaction_participant.product_aggregate'
        if dof == 0:
            substrates = substrates
            products = products
        elif dof == 1:
            substrates = [re.compile('^' + x[:-2]) for x in substrates]
            products = [re.compile('^' + x[:-2]) for x in products]
        else:
            substrates = [re.compile('^' + x[:14]) for x in substrates]
            products = [re.compile('^' + x[:14]) for x in products]

        if bound == 'loose':
            constraint_0 = {substrate: {'$all': substrates}}
            constraint_1 = {product: {'$all': products}}
            constraint_2 = {"taxon_id": {"$ne": None}}
        else:
            constraint_0 = {substrate: substrates}
            constraint_1 = {product: products}
            constraint_2 = {"taxon_id": {"$ne": None}}            
        query = {'$and': [constraint_0, constraint_1, constraint_2]}
        lookup = lookups.Lookups().simple_lookup("kegg_orthology", "resource.id", "definition.ec_code", "kegg_meta")
        if limit > 0:
            pipeline = [{"$match": query}, {"$limit": limit}, {"$skip": skip}, lookup, {"$project": projection}]
        else:
            pipeline = [{"$match": query}, {"$skip": skip}, lookup, {"$project": projection}]
        docs = self.collection.aggregate(pipeline)
        count = self.collection.count_documents(query)
        return count, docs

    def get_kinlaw_by_rxn_ortho(self, substrates, products, dof=0,
                          projection={'kinlaw_id': 1, '_id': 0, "enzymes": 1},
                          bound='loose', skip=0, limit=0):
        ''' Find the kinlaw_id defined in sabio_rk using 
            rxn participants' inchikey

            Args:
                substrates (:obj:`list`): list of substrates' inchikey
                products (:obj:`list`): list of products' inchikey
                dof (:obj:`int`, optional): degree of freedom allowed (number of parts of
                                  inchikey to truncate); the default is 0
                projection (:obj:`dict`): pymongo query projection 
                bound (:obj:`str`): limit substrates/products to include only input values

            Return:
                (:obj:`list` of :obj:`dict`): list of kinlaws that satisfy the condition
        '''
        substrate = 'reaction_participant.substrate_aggregate'
        product = 'reaction_participant.product_aggregate'
        if dof == 0:
            substrates = substrates
            products = products
        elif dof == 1:
            substrates = [re.compile('^' + x[:-2]) for x in substrates]
            products = [re.compile('^' + x[:-2]) for x in products]
        else:
            substrates = [re.compile('^' + x[:14]) for x in substrates]
            products = [re.compile('^' + x[:14]) for x in products]

        if bound == 'loose':
            constraint_0 = {substrate: {'$all': substrates}}
            constraint_1 = {product: {'$all': products}}
            constraint_2 = {"taxon_id": {"$ne": None}}
        else:
            constraint_0 = {substrate: substrates}
            constraint_1 = {product: products}
            constraint_2 = {"taxon_id": {"$ne": None}}            
        query = {'$and': [constraint_0, constraint_1, constraint_2]}
        # lookup = lookups.Lookups().simple_lookup("kegg_orthology", "resource.id", "definition.ec_code", "kegg_meta")
        # if limit > 0:
        #     pipeline = [{"$match": query}, {"$limit": limit}, {"$skip": skip}, lookup, {"$project": projection}]
        # else:
        #     pipeline = [{"$match": query}, {"$skip": skip}, lookup, {"$project": projection}]
        docs = self.collection.find(filter=query,
                                    limit=limit,
                                    skip=skip,
                                    projection=projection)
        cache = {}
        result = []
        for doc in docs:
            try:
                u_id = doc["enzymes"][2]["subunit"][0]["uniprot_id"]
                if u_id is not None:
                    if cache.get(u_id) is None:
                        x = self.u.find_one(filter={"uniprot_id": u_id},
                                        projection={"orthodb_id": 1,
                                                    "orthodb_name": 1})
                        cache[u_id] = x
                    else:
                        x = cache.get(u_id)
                    doc["orthodb_id"] = x["orthodb_id"]
                    doc["orthodb_name"] = x["orthodb_name"]
                else:
                    doc["orthodb_id"] = None
                    doc["orthodb_name"] = None
            except:
                doc["orthodb_id"] = None
                doc["orthodb_name"] = None
            doc.pop("enzymes", None)
            result.append(doc)
        count = self.collection.count_documents(query)
        return count, result

    def get_kinlaw_by_entryid(self, entry_id):
        """Find reactions by sabio entry id
        
        Args:
            entry_id (:obj:`int`): entry_id

            Return:
                (:obj:`dict`): {'kinlaw_id': [], 'substrates': [], 'products': []}
        """
        result = {}
        kinlaw_id = deque()
        substrates = deque()
        products = deque()
        constraint_0 = {'namespace': 'sabiork.reaction', 'id': str(entry_id)}
        query = {'resource': {'$elemMatch': constraint_0}}
        projection = {'_id': 0, 'kinlaw_id': 1, 'reaction_participant.substrate_aggregate': 1,
                     'reaction_participant.product_aggregate': 1}
        docs = self.collection.find(filter=query, projection=projection)
        for doc in docs:
            kinlaw_id.append(doc['kinlaw_id'])
            substrates.append(doc['reaction_participant'][3]['substrate_aggregate'])
            products.append(doc['reaction_participant'][4]['product_aggregate'])

        result['kinlaw_id'] = kinlaw_id
        result['substrates'] = substrates
        result['products'] = products
        return result


    def get_info_by_entryid(self, entry_id, target_organism=None, size=10, last_id=0):
        """Find reactions by sabio entry id, return all information
        
        Args:
            entry_id (:obj:`int`): entry_id
            target_organism (:obj:`str`): the organism in which the reaction occurs
            size (:obj:`int`): pagination page size
            last_id (:obj:`int`) the largest kinlaw id from previous page

        Return:
            (:obj:`list` of :obj:`dict`): list of documents of entry id
        """
        constraint_0 = {'namespace': 'sabiork.reaction', 'id': str(entry_id)}
        query = {'resource': {'$elemMatch': constraint_0}}
        projection = {'_id': 0}
        sort = [('kinlaw_id', ASCENDING)]
        taxon_name = None
        distance = -1
        result = deque()
        docs = self.collection.find(filter=query, projection=projection, sort=sort, limit=size)
        if target_organism is not None:  # need distance information
            for i, doc in enumerate(docs):
                if i == 0:
                    taxon_name = doc['taxon_name']
                    _, dist = self.taxon_manager.get_common_ancestor(taxon_name, target_organism)
                    distance = dist[0]
                    doc['taxon_distance'] = distance
                    result.append(doc)
                else:
                    doc['taxon_distance'] = distance
                    result.append(doc)
        else:
            for doc in docs:
                result.append(doc)
        return result

    def get_kinlaw_by_rxn_name(self, substrates, products,
                                projection={"kegg_meta.gene_ortholog": 0, 'kegg_meta._id': 0, '_id': 0},
                                bound='loose', skip=0, limit=0):
        ''' Find the kinlaw_id defined in sabio_rk using 
            rxn participants' names

            Args:
                substrates (:obj:`list`): list of substrates' names
                products (:obj:`list`): list of products' names
                projection (:obj:`dict`): pymongo query projection 
                bound (:obj:`str`): limit substrates/products to include only input values

            Return:
                (:obj:`list` of :obj:`dict`): list of kinlaws that satisfy the condition
        '''
        sub_key_field = 'substrate_names'
        pro_key_field = 'product_names'
        bounded_s = {sub_key_field: {'$size': len(substrates)}}
        bounded_p = {pro_key_field: {'$size': len(products)}}

        s_constraint = {sub_key_field: {'$all': substrates}}
        p_constraint = {pro_key_field: {'$all': products}}

        if bound == 'loose':
            query = {'$and': [s_constraint, p_constraint]}
        else:
            query = {'$and': [s_constraint, p_constraint, bounded_s, bounded_p]}
        # docs = self.collection.find(filter=query, projection=projection,
        #                             skip=skip, limit=limit)
        lookup = lookups.Lookups().simple_lookup("kegg_orthology", "resource.id", "definition.ec_code", "kegg_meta")
        if limit > 0:
            pipeline = [{"$match": query}, {"$limit": limit}, {"$skip": skip}, lookup, {"$project": projection}]
        else:
            pipeline = [{"$match": query}, {"$skip": skip}, lookup, {"$project": projection}]
        docs = self.collection.aggregate(pipeline)
        count = self.collection.count_documents(query)
        return count, docs

    def get_unique_entries(self):
        """Get number of unique curated entries.

        Return:
            (:obj:`int`): Number of unique entries.
        """
        return len(self.collection.distinct('kinlaw_id'))

    def get_unique_organisms(self):
        """Get number of unique organisms.

        Return:
            (:obj:`int`): Number of unique organisms.
        """
        return len(self.collection.distinct('taxon_id'))

    def get_rxn_with_prm(self, kinlaw_ids, _from=0, size=10):
        """Given a list of kinlaw ids, return documents where
        kinlaw has at least one Km or kcat.
        
        Args:
            kinlaw_ids (:obj:`list` of :obj:`int`): List of kinlaw IDs.
            _from (:obj:`int`): record offset. Defaults to 0.
            size (:obj:`int`): number of records to be returned. Defaults to 10.

        Return:
            (:obj:`tuple` of :obj:`list` of :obj:`dict` and :obj:`list` of :obj:`int`): list of rxn documents, and ids that have parameter
        """
        result = deque()
        have = deque()
        con_0 = {'parameter.observed_name': {'$in': ['Km', 'kcat']}}
        con_1 = {'kinlaw_id': {'$in': kinlaw_ids}}
        query = {'$and': [con_0, con_1]}
        projection = {'_id': 0}
        cursor = self.collection.find(filter=query, projection=projection, collation=self.collation,
                                      skip=_from, limit=size)
        if cursor is None:
            return [], []
        for r in cursor:
            result.append(r)
            have.append(r['kinlaw_id'])
        return result, have

    def get_reaction_by_subunit(self, _ids):
        """Get reactions by enzyme subunit uniprot IDs
        
        Args:
            _ids (:obj:`list` of :obj:`str`): List of uniprot IDs.

        Return:
            (:obj:`list` of :obj:`str`): List of kinlaw IDs.
        """
        result = deque()
        entry_ids = set()
        projection = {'_id': 0, 'ec_meta': 1, 'substrates': 1, 'products': 1, 
                      'kinlaw_id': 1, 'resource': 1, 'reaction_participant.substrate_aggregate': 1,
                      'reaction_participant.product_aggregate': 1}
        pipeline = [
             {'$match': {'enzymes.subunit.uniprot_id': {'$in': _ids}}},
             {'$addFields': {"__order": {'$indexOfArray': [_ids, "$enzymes.subunit.uniprot_id"]},
                             "substrates": "$reaction_participant.substrate",
                             "products": "$reaction_participant.product"}},
             {'$sort': {"__order": 1}},
             {"$project": projection}
            ]
        docs = self.collection.aggregate(pipeline)
        if docs is None:
            return ['No reaction found.']
        for doc in docs:
            try:
                entry_id = doc['resource'][-1]['id']
            except IndexError:
                entry_id = -1
            if entry_id in entry_ids:
                continue
            else:
                result.append(doc)
                entry_ids.add(entry_id)
        return result