KarrLab/datanator_query_python

View on GitHub
datanator_query_python/util/mongo_util.py

Summary

Maintainability
C
1 day
Test Coverage
B
86%
import pymongo
from pymongo.read_preferences import Primary, PrimaryPreferred, Secondary, SecondaryPreferred, Nearest
import copy
import json
from genson import SchemaBuilder


class MongoUtil:

    def __init__(self, cache_dirname=None, MongoDB=None, replicaSet=None, db='test',
                 verbose=False, max_entries=float('inf'), username=None, 
                 password=None, authSource='admin', readPreference='nearest'):
        string = "mongodb+srv://{}:{}@{}/{}?authSource={}&retryWrites=true&w=majority&readPreference={}".format(username, password, MongoDB, db, authSource, readPreference)
        self.client = pymongo.MongoClient(string)
        self.db_obj = self.client.get_database(db)

    def list_all_collections(self):
        '''List all non-system collections within database
        '''
        return self.db_obj.list_collection_names()

    def con_db(self, collection_str):
        try:
            collection = self.db_obj[collection_str]
            return (self.client, self.db_obj, collection)
        except pymongo.errors.ConnectionFailure:
            return ('Server not available')
        except pymongo.errors.ServerSelectionTimeoutError:
            return ('Server timeout')

    def get_schema(self, collection_str):
        '''Get schema of a collection
           removed '_id' from collection due to its object type
           and universality 
        '''
        _, _, collection = self.con_db(collection_str)
        doc = collection.find_one({})
        builder = SchemaBuilder()
        del doc['_id']
        builder.add_object(doc)
        return builder.to_schema()

    def flatten_collection(self, collection_str):
        '''Flatten a collection

            c is ommitted because it does not have a non-object 
            value associated with it
        '''
        _, _, collection = self.con_db(collection_str)

        pipeline = [
            { "$addFields": { "subdoc.a": "$a" } },
            { "$replaceRoot": { "newRoot": "$subdoc" }  }
        ]
        flat_col = collection.aggregate(pipeline)
        return flat_col

    def get_duplicates(self, collection_str, _key, **kwargs):
        """Get duplicate key entries in collection.
        
        Args:
            collection_str (:obj:`str`): Name of collection.
            _key (:obj:`str`): Name of field in wichi to find duplicate values.

        Return:
            (:obj:`tuple` of :obj:`int` and :obj:`CommandCursor`):
            length of cursor and documents iterable.
        """
        collection = self.db_obj[collection_str]
        _id = "$"+_key
        pipeline = [{"$group": {"_id": {_key: _id}, "count": {"$sum": 1}, "uniqueIds": {"$addToSet": "$_id"}}},
                    {"$match": {"count": {"$gt": 1}}},
                    {"$sort": {"count": -1}}]
        count_pipeline = copy.deepcopy(pipeline)
        count_pipeline[-1] = {"$count": "total_return"}
        counts = collection.aggregate(count_pipeline, **kwargs)
        for i, count in enumerate(counts):
            if i == 0:
                num = count["total_return"]                    
        return num, collection.aggregate(pipeline, **kwargs)

    def define_schema(self, collection_str, json_schema):
        """Define collection's $jsonSchema
        (https://docs.mongodb.com/manual/reference/operator/query/jsonSchema/)

        Args:
            collection_str (:obj:`str`): Name of the collection to be defined.
            json_schema (:obj:`str` or :obj:`dict`): location of the jsonSchema definition.
        """
        if isinstance(json_schema, str):
            with open(json_schema) as j:
                schema = json.load(j)
        else:
            schema = json_schema
        self.db_obj.create_collection(collection_str,
                                      validator={"$jsonSchema": schema},
                                      validationLevel="moderate")

    def update_observation(self, 
                           obj,
                           source,
                           db="test",
                           op="update",
                           col="observation",
                           query={}):
        """Update observation collection

        Args:
            obj(:obj:`Obj`): obs object to be updated with.
            source(:obj:`Obj`): one of the sources used for matching.
            db(:obj:`Obj`): Name of database to be updated. 
            op(:obj:`str`): Operation to be done.
            col(:obj:`str`): Name of collection to be updated.
            query(:obj:`Obj`): Filter for updating operation.
        """
        _set = {}
        add_to_set = {}
        for key, val in obj.items():
            if isinstance(val, list):
                add_to_set[key] = {"$each": val}
            else:
                _set[key] = val
        if query == {}:
            query = {"$and": [{"identifier": _set["identifier"]},
                              {"source": {"$elemMatch": source}}]}
        _update = {"$set": _set,
                   "$addToSet": add_to_set}
        if op == "update":
            self.client[db][col].update_one(query,
                                            _update,
                                            upsert=True)
        else:
            return query, _update

    def update_entity(self,
                      obj,
                      match,
                      db="test",
                      col="entity",
                      op="update",
                      query={}):
        """Update entity collection.

        Args:
            obj(:obj:`Obj`): object to be updated with.
            match(:obj:`Obj`): Identifier used to match existing document in collection.
            db(:obj:`str`): Name of database to be updated.
            col(:obj:`str`): Name of collection to be updated.
            op(:obj:`str`): operation to be done, e.g. update or bulk.
            query(:obj:`Obj`): Filter for updating operation.
        """
        _set = {}
        add_to_set = {}
        for key, val in obj.items():
            if isinstance(val, list):
                add_to_set[key] = {"$each": val}
            else:
                _set[key] = val
        if query == {}:
            query = {"identifiers": {"$elemMatch": match}}
        _update = {"$set": _set,
                   "$addToSet": add_to_set}
        if op == "update":
            self.client[db][col].update_one(query,
                                            _update,
                                            upsert=True)
        else:
            return query, _update

    def build_taxon_object(self, _id, _format="tax_id"):
        """Build taxon object from taxon_id.
        (https://github.com/KarrLab/datanator_pattern_design/blob/master/components/taxon.json)

        Args:
            _id (:obj:`int`): Organism's taxonomy ID.

        Return:
            (:obj:`Obj`)
        """
        if _format == "tax_name":
            query = {"$or": [{"tax_name": _id}, {"name_txt": _id}]}
        else:
            query = {_format: _id}
        obj = self.client["datanator-test"]["taxon_tree"].find_one(query,
                                                                    projection={"canon_anc_ids": 1,
                                                                                "canon_anc_names": 1,
                                                                                "tax_name": 1,
                                                                                "tax_id": 1})
        if obj is None:
            return {}
        else:
            canon_ancestors = []
            canon_anc_ids = obj["canon_anc_ids"]
            canon_anc_names = obj["canon_anc_names"]
            for _id, name in zip(canon_anc_ids, canon_anc_names):
                canon_ancestors.append({"ncbi_taxonomy_id": _id,
                                        "name": name})
            return {"ncbi_taxonomy_id": obj["tax_id"],
                    "name": obj["tax_name"],
                    "canon_ancestors": canon_ancestors}