KarrLab/datanator_query_python

View on GitHub
datanator_query_python/query/query_sabiork.py

Summary

Maintainability
F
3 days
Test Coverage
A
96%
from datanator_query_python.util import mongo_util, chem_util, file_util
import json

class QuerySabio(mongo_util.MongoUtil):
    '''Queries specific to sabio_rk collection
    '''

    def __init__(self, cache_dirname=None, MongoDB=None, replicaSet=None, db='datanator',
                 collection_str='sabio_rk', verbose=False, max_entries=float('inf'), username=None,
                 password=None, authSource='admin'):
        self.max_entries = max_entries
        super().__init__(cache_dirname=cache_dirname, MongoDB=MongoDB,
                        replicaSet=replicaSet, db=db,
                        verbose=verbose, max_entries=max_entries, username=username,
                        password=password, authSource=authSource)
        self.chem_manager = chem_util.ChemUtil()
        self.file_manager = file_util.FileUtil()
        self.collection = self.db_obj[collection_str]

    def get_reaction_doc(self, kinlaw_id):
        '''
            Find a document on reaction with the kinlaw_id
            Args:
                kinlaw_id (:obj:`list` of :obj:`int`) list of kinlaw_id to search for
            Returns:
                result (:obj:`list` of :obj:`dict`): list of docs
        '''
        projection = {'_id': 0}
        query = {'kinlaw_id': {'$in': kinlaw_id}}
        result = []
        docs = self.collection.find(filter=query, projection=projection)
        for doc in docs:
            to_append = json.dumps(doc, indent=4, sort_keys=True, default=str)
            result.append(json.loads(to_append))
        return result

    def find_reaction_participants(self, kinlaw_id):
        ''' Find the reaction participants defined in sabio_rk using kinetic law id
        
            Args:
                kinlaw_id (:obj:`list` of :obj:`int`) list of kinlaw_id to search for

            Return:
                rxns (:obj:`list` of :obj:`dict`) list of dictionaries containing names of reaction participants
                [{'substrates': [], 'products': [] }, ... {} ]
        '''
        projection = {'products': 1, 'reactants': 1, '_id': 0, 'kinlaw_id':1}
        if isinstance(kinlaw_id, list):
            query = {'kinlaw_id': {'$in': kinlaw_id}}
        else:
            query = {'kinlaw_id': kinlaw_id}
        docs = self.collection.find(filter=query, projection=projection)
        rxns = []
        i = 0
        for doc in docs:
            if i == self.max_entries:
                break
            if i % 10 == 0:
                print('Finding reaction participants for kinlaw_id {} ...'.format(
                    doc['kinlaw_id']))

            substrates = self.file_manager.get_val_from_dict_list(doc.get('reactants',), 'name')
            products = self.file_manager.get_val_from_dict_list(doc.get('products',), 'name')

            rxn = {'substrates': substrates, 'products': products}

            rxns.append(rxn)
            i += 1

        return rxns

    def get_kinlawid_by_inchi(self, hashed_inchi):
        ''' Find the kinlaw_id defined in sabio_rk using 
            rxn participants' inchi string
            Args:
                inchi (:obj:`list` of :obj:`str`): list of inchi, all in one rxn
            Return:
                rxns (:obj:`list` of :obj:`int`): list of kinlaw_ids that satisfy the condition
                [id0, id1, id2,...,  ]
        '''
        # hashed_inchi = [self.chem_manager.inchi_to_inchikey(s)
        #                 for s in inchi]
        substrate = 'reactants.structures.InChI_Key'
        product = 'products.structures.InChI_Key'
        projection = {'kinlaw_id': 1}

        id_tally = []
        for inchi in hashed_inchi:
            ids = []
            query = {'$or': [{substrate: inchi}, {product: inchi}]}
            cursor = self.collection.find(filter=query, projection=projection)
            for doc in cursor:
                ids.append(doc['kinlaw_id'])
            id_tally.append(ids)

        return list(set(id_tally[0]).intersection(*id_tally))

    def get_kinlawid_by_rxn(self, substrates, products):
        ''' Find the kinlaw_id defined in sabio_rk using 
            rxn participants' inchi string

            Args:
                substrates: list of substrates' inchi
                products: list of products' inchi

            Return:
                rxns: list of kinlaw_ids that satisfy the condition
                [id0, id1, id2,...,  ]
        '''

        def get_kinlawid(hashed_inchi, side='substrate'):
            ''' Find the kinlaw_id defined in sabio_rk using 
                rxn participants' inchi string

                Args:
                    inchi: list of inchi, all in one rxn, on one side

                Return:
                    rxns: list of kinlaw_ids that satisfy the condition
                    [id0, id1, id2,...,  ]
            '''

            substrate = 'reactants.structures.InChI_Key'
            product = 'products.structures.InChI_Key'
            projection = {'kinlaw_id': 1, '_id': 0}

            id_tally = []
            if side == 'substrate':
                for inchi in hashed_inchi:
                    ids = []
                    query = {substrate: inchi}
                    cursor = self.collection.find(
                        filter=query, projection=projection)
                    for doc in cursor:
                        ids.append(doc['kinlaw_id'])
                    id_tally.append(ids)

                return list(set(id_tally[0]).intersection(*id_tally))
            else:

                for inchi in hashed_inchi:
                    ids = []
                    query = {product: inchi}
                    cursor = self.collection.find(
                        filter=query, projection=projection)
                    for doc in cursor:
                        ids.append(doc['kinlaw_id'])
                    id_tally.append(ids)

                return list(set(id_tally[0]).intersection(*id_tally))

        sub_id = get_kinlawid(substrates, side='substrate')
        pro_id = get_kinlawid(products, side='product')
        result = list(set(sub_id) & set(pro_id))

        return result

    def get_kinlawid_by_name(self, substrates, products):
        ''' Get kinlaw_id from substrates and products, all in one reaction

            Args:
                substrates: (:obj:`list` of :obj:`str`): list of substrate names
                products: (:obj:`list` of :obj:`str`): list of product names

            Returns:
                result: (:obj:`list` of :obj:`str`): list of compound names
        '''
        collation = {'locale': 'en', 'strength': 2}
        projection = {'_id': 0, 'products': 1, 'reactants': 1, 'kinlaw_id': 1}

        def get_kinlawid(compounds, side='substrate'):
            ''' Find the kinlaw_id defined in sabio_rk using 
                rxn participants' name

                Args:
                    compounds (:obj:`list` of :obj:`str`): compound names all in one rxn, on one side
                    side (:obj:`str`): left side or right side of the rxn
                    
                Return:
                    rxns: list of kinlaw_ids that satisfy the condition
                    [id0, id1, id2,...,  ]
            '''
            substrates = 'reactants.name'
            products = 'products.name'
            projection = {'kinlaw_id': 1, '_id': 0}

            id_tally = []
            if side == 'substrate':
                for name in compounds:
                    ids = []
                    query = {substrates: name}
                    cursor = self.collection.find(
                        filter=query, projection=projection, collation=collation)
                    for doc in cursor:
                        ids.append(doc['kinlaw_id'])
                    id_tally.append(ids)

                return list(set(id_tally[0]).intersection(*id_tally))
            else:

                for name in compounds:
                    ids = []
                    query = {products: name}
                    cursor = self.collection.find(
                        filter=query, projection=projection, collation=collation)
                    for doc in cursor:
                        ids.append(doc['kinlaw_id'])
                    id_tally.append(ids)

                return list(set(id_tally[0]).intersection(*id_tally))

        if substrates == None:
            sub_id = self.collection.distinct('kinlaw_id')
        else:
            sub_id = get_kinlawid(substrates, side='substrate')

        if products == None:
            pro_id = self.collection.distinct('kinlaw_id')
        else:
            pro_id = get_kinlawid(products, side='product')
            
        result = list(set(sub_id) & set(pro_id))

        return result

    def get_kinlaw_by_environment(self, taxon=None, taxon_wildtype=None, ph_range=None, temp_range=None,
                          name_space=None, observed_type=None, projection={'_id': 0}):
        """get kinlaw info based on experimental conditions
        
        Args:
            taxon (:obj:`list`, optional): list of ncbi taxon id
            taxon_wildtype (:obj:`list` of :obj:`bool`, optional): True indicates wildtype and False indicates mutant
            ph_range (:obj:`list`, optional): range of pH
            temp_range (:obj:`list`, optional): range of temperature
            name_space (:obj:`dict`, optional): cross_reference key/value pair, i.e. {'ec-code': '3.4.21.62'}
            observed_type (:obj:`list`, optional): possible values for parameters.observed_type
            projection (:obj:`dict`, optional): mongodb query result projection

        Returns:
            (list): list of kinetic laws that meet the constraints 
        """
        all_constraints = []
        if taxon:
            all_constraints.append({'taxon': {'$in': taxon}})
        if taxon_wildtype:    
            all_constraints.append({'taxon_wildtype': {'$in': taxon_wildtype}})
        if ph_range:
            all_constraints.append({'ph': {'$gte': ph_range[0], '$lte': ph_range[1]}})
        if temp_range:    
            all_constraints.append({'temperature': {'$gte': temp_range[0], '$lte': temp_range[1]}})
        if name_space:
            key = list(name_space.keys())[0]
            val = list(name_space.values())[0]
            field = 'cross_references' + '.' + key
            all_constraints.append({field: val})
        if observed_type:
            all_constraints.append({'parameters.observed_type': {'$in': observed_type}})
        
        query = {'$and': all_constraints}
        docs = self.collection.find(filter=query, projection=projection)
        result = []
        for doc in docs:
            result.append(doc)
        
        return result

    def get_subunit_by_id(self, _id):
        """Get protein subunit information by kinlaw_id.
        
        Args:
            _id (:obj:`int`): kinlaw_id.

        Return:
            (:obj:`str`): uniprot_id.
        """
        false = 'No subunit information.'
        projection = {'enzyme': 1, '_id': 0}
        doc = self.collection.find_one({'kinlaw_id': _id}, projection=projection)
        enzyme = doc['enzyme']
        if enzyme != [] and enzyme is not None:
            subunits = enzyme[0].get('subunits')
            if subunits != [] and subunits is not None:
                uniprot_id = subunits[0].get('uniprot', false)
                return uniprot_id
            else:
                return false
        else:
            return false