KarrLab/datanator_query_python

View on GitHub
datanator_query_python/aggregate/tabu.py

Summary

Maintainability
A
2 hrs
Test Coverage
F
36%
"""Tabulate taxonomic distribution of all observations across relevant collections.
https://github.com/KarrLab/datanator_rest_api/issues/97
"""
from datanator_query_python.util import mongo_util
from datanator_query_python.aggregate import pipelines


class Tabu(mongo_util.MongoUtil):
    def __init__(self, MongoDB=None, db=None, username=None, password=None,
                authSource=None, readPreference=None, max_entries=float('inf'),
                verbose=True):
        super().__init__(MongoDB=MongoDB, db=db, username=username, password=password,
                         authSource=authSource, readPreference=readPreference,
                         max_entries=max_entries, verbose=verbose)
        self.verbose = verbose
        self.max_entries = max_entries
        self.uniprot_col = self.db_obj['uniprot']
        self.pipeline_manager = pipelines.Pipeline()

    def uniprot_taxon_dist(self):
        """Tabulate uniprot collections taxonomic distribution.

        Return:
            (:obj:`CommandCursor`)
        """
        match = {"$match": {"abundances": {"$exists": True}}}
        pipeline = self.pipeline_manager.aggregate_all_occurences("ncbi_taxonomy_id", match=match,
                                                                  project_post={"count": 1, "species_name": 1})
        return self.uniprot_col.aggregate(pipeline)

    def taxon_dist(self, collection, field, match=None, unwind=None):
        """Generalized version of uniprot_taxon_dist.

        Args:
            collection(:obj:`str`): name of collection.
            field(:obj:`str`): Field upon which aggregation will be done.
            match(:obj:`Obj`): Filtering of unnecessary data.
            unwind(:obj:`Obj`): Unwind operation if field of interest is in subdocuments.

        Return:
            (:obj:`CommandCursor`)
        """
        pipeline = self.pipeline_manager.aggregate_all_occurences(field, match=match,
                                                                  unwind=unwind)
        return self.db_obj[collection].aggregate(pipeline)


from datanator_query_python.config import config
import itertools
from collections import defaultdict
import json


def main():
    db = 'datanator'
    conf = config.TestConfig()
    username = conf.USERNAME
    password = conf.PASSWORD
    MongoDB = conf.SERVER
    src = Tabu(MongoDB=MongoDB, username=username, password=password,
               db=db, verbose=True, authSource='admin',
               readPreference='nearest', max_entries=10)

    uniprot_docs = src.uniprot_taxon_dist()
    sabio_docs = src.taxon_dist("sabio_rk_old", "taxon_id", match={"$match": {"taxon_id": {"$ne": None}}})
    rnamod_docs = src.taxon_dist("rna_modification", "modifications.ncbi_taxonomy_id", unwind={"$unwind": "$modifications"}, 
                                 match={"$match": {"modifications.ncbi_taxonomy_id": {"$ne": None}}})
    rnahalf_docs = src.taxon_dist("rna_halflife_new", "halflives.ncbi_taxonomy_id", unwind={"$unwind": "$halflives"})
    conc_docs = src.taxon_dist("metabolite_concentrations", "concentrations.ncbi_taxonomy_id", unwind={"$unwind": "$concentrations"})

    obj = defaultdict(int)
    for (p_doc, s_doc, m_doc, l_doc, c_doc) in itertools.zip_longest(uniprot_docs, sabio_docs, rnamod_docs, rnahalf_docs, conc_docs, fillvalue={'_id': 1, 'count': 0}):
        obj[p_doc['_id']] += p_doc['count']
        obj[s_doc['_id']] += s_doc['count']
        obj[m_doc['_id']] += m_doc['count']
        obj[l_doc['_id']] += l_doc['count']
        obj[c_doc['_id']] += c_doc['count']

    # sort obj
    new_obj = {k: v for k, v in sorted(obj.items(), key=lambda item: item[1], reverse=True)}

    with open("./docs/taxon_distribution.json", "w+") as outfile: 
        json.dump(new_obj, outfile)

    final_obj = {"others": 0}
    for i, (key, val) in enumerate(new_obj.items()):
        if i < 11 and key != 1:
            name = src.db_obj['taxon_tree'].find_one({"tax_id": int(key)}, projection={"tax_name": 1})["tax_name"]
            final_obj[name] = val
        else:
            final_obj["others"] += val 

    with open("./docs/taxon_distribution_frontend.json", "w+") as outfile: 
        json.dump(final_obj, outfile)


if __name__ == '__main__':
    main()