davidlatwe/MontyDB

View on GitHub
montydb/collection.py

Summary

Maintainability
D
2 days
Test Coverage
import warnings
from copy import deepcopy

from .base import (
    BaseObject,
    WriteConcern,
    validate_is_mapping,
    validate_ok_for_update,
    validate_ok_for_replace,
    validate_list_or_none,
    validate_boolean,
)

from .cursor import MontyCursor
from .engine.field_walker import FieldWalker
from .engine.weighted import Weighted
from .engine.queries import QueryFilter
from .engine.update import Updator
from .types import (
    abc,
    bson,
    string_types,
    is_duckument_type,
    Counter,
    on_err_close,
)
from .storage import StorageDuplicateKeyError
from .errors import (
    DuplicateKeyError,
    BulkWriteError,
    WriteError,
)

from .results import (
    DeleteResult,
    InsertOneResult,
    InsertManyResult,
    UpdateResult,
)


NotImplementeds = {
    "aggregate",
    "aggregate_raw_batches",
    "bulk_write",
    "watch",
    "find_raw_batches",
    "find_one_and_delete",
    "find_one_and_replace",
    "find_one_and_update",
    "create_indexes",
    "drop_index",
    "drop_indexes",
    "reindex",
    "list_indexes",
    "index_information",
    "rename",
    "options",
    "map_reduce",
    "inline_map_reduce",
    "parallel_scan",
}


class MontyCollection(BaseObject):
    def __init__(
        self,
        database,
        name,
        create=False,
        codec_options=None,
        read_preference=None,
        write_concern=None,
        read_concern=None,
        session=None,
        **kwargs
    ):
        """ """
        super(MontyCollection, self).__init__(
            codec_options or database.codec_options,
            write_concern or database.write_concern,
        )

        self._storage = database.client._storage

        self._database = database
        self._name = name
        self._components = (database, self)

    def __repr__(self):
        return "MontyCollection({!r}, {!r})".format(self._database, self._name)

    def __eq__(self, other):
        if isinstance(other, self.__class__):
            return self._database == other.database and self._name == other.name
        return NotImplemented

    def __ne__(self, other):
        return not self == other

    def __getattr__(self, name):
        if name in NotImplementeds:
            raise NotImplementedError(
                "'MontyCollection.%s' is NOT implemented !" % name
            )
        if name.startswith("_"):
            full_name = ".".join((self._name, name))
            raise AttributeError(
                "MontyCollection has no attribute {0!r}. To access the {1}"
                " collection, use database[{1!r}].".format(name, full_name)
            )
        return self.__getitem__(name)

    def __getitem__(self, key):
        return self._database.get_collection(".".join((self._name, key)))

    @property
    def full_name(self):
        """ """
        return u".".join((self._database.name, self._name))

    @property
    def name(self):
        """ """
        return self._name

    @property
    def database(self):
        """ """
        return self._database

    def with_options(self, codec_options=None, write_concern=None, *args, **kwargs):
        if not isinstance(write_concern, WriteConcern):
            # Could be `pymongo.WriteConcern` if called from mongoengine.
            write_concern = None

        return MontyCollection(
            self._database,
            self._name,
            False,
            codec_options or self.codec_options,
            write_concern or self.write_concern,
        )

    def insert_one(self, document, bypass_document_validation=False, *args, **kwargs):
        """ """
        if bypass_document_validation:
            pass

        if "_id" not in document:
            document["_id"] = bson.ObjectId()

        try:
            result = self._storage.write_one(self, document)
        except StorageDuplicateKeyError:
            message = (
                "E11000 duplicate key error collection: %s index: "
                '_id_ dup key: { : "%s" }' % (self.full_name, str(document["_id"]))
            )
            details = {"index": 0, "code": 11000, "errmsg": message}
            raise DuplicateKeyError(message, code=11000, details=details)

        return InsertOneResult(result)

    def insert_many(
        self, documents, ordered=True, bypass_document_validation=False, *args, **kwargs
    ):
        """ """
        if not isinstance(documents, abc.Iterable) or not documents:
            raise TypeError("documents must be a non-empty list")

        if bypass_document_validation:
            pass

        def set_id(doc):
            if "_id" not in doc:
                doc["_id"] = bson.ObjectId()
            # Keep _id in track for error message
            return doc["_id"]

        counter = Counter(iter(documents), job_on_each=set_id)

        try:
            result = self._storage.write_many(self, counter, ordered)
        except StorageDuplicateKeyError:
            message = (
                "E11000 duplicate key error collection: %s index: "
                '_id_ dup key: { : "%s" }' % (self.full_name, str(counter.data))
            )
            index = counter.count - 1
            result = {
                "writeErrors": [
                    {
                        "index": index,
                        "code": 11000,
                        "errmsg": message,
                        "op": documents[index],
                    }
                ],
                "writeConcernErrors": [],
                "nInserted": index,
                "nUpserted": 0,
                "nMatched": 0,
                "nModified": 0,
                "nRemoved": 0,
                "upserted": [],
            }
            raise BulkWriteError(result)

        return InsertManyResult(result)

    def replace_one(
        self,
        filter,
        replacement,
        upsert=False,
        bypass_document_validation=False,
        *args,
        **kwargs
    ):
        """ """
        validate_is_mapping("filter", filter)
        validate_ok_for_replace(replacement)
        validate_boolean("upsert", upsert)

        if bypass_document_validation:
            pass

        raw_result = {"n": 0, "nModified": 0}
        # updator = Updator(replacement)
        try:
            fw = next(self._internal_scan_query(filter))
        except StopIteration:
            if upsert:
                if "_id" not in replacement:
                    replacement["_id"] = bson.ObjectId()
                raw_result["upserted"] = replacement["_id"]
                raw_result["n"] = 1
                self._storage.write_one(self, replacement, check_keys=False)
        else:
            raw_result["n"] = 1
            if fw.doc != replacement:
                replacement["_id"] = fw.doc["_id"]
                self._storage.update_one(self, replacement)
                raw_result["nModified"] = 1

        return UpdateResult(raw_result)

    def _internal_scan_query(self, query_spec):
        """An internal document generator for update"""
        queryfilter = QueryFilter(query_spec)
        documents = self._storage.query(MontyCursor(self), 0)
        first_matched = None
        for doc in documents:
            if queryfilter(doc):
                first_matched = queryfilter.fieldwalker
                break

        if first_matched:
            yield first_matched  # for try statement to test update or insert
            yield first_matched  # start update, yield again
            # continue iter documents(generator)
            for doc in documents:
                if queryfilter(doc):
                    yield queryfilter.fieldwalker

    def _internal_upsert(self, query_spec, updator, raw_result):
        """Internal document upsert"""
        doc_cls = self._database.codec_options.document_class

        def _remove_dollar_key(doc):
            if is_duckument_type(doc):
                new_doc = doc_cls()
                fields = list(doc.keys())
                for field in fields:
                    if field[:1] == "$" or "." in field:
                        continue
                    new_doc[field] = _remove_dollar_key(doc[field])
                return new_doc
            else:
                return doc

        document = _remove_dollar_key(deepcopy(query_spec))
        if "_id" not in document:
            document["_id"] = bson.ObjectId()
        raw_result["upserted"] = document["_id"]
        raw_result["n"] = 1

        fieldwalker = FieldWalker(document)
        updator(fieldwalker, do_insert=True)
        self._storage.write_one(self, fieldwalker.doc)

    def _no_id_update(self, updator, filter=None):
        id_operator = updator.operations.get("_id")
        doc_id = (filter or {}).get("_id")
        if id_operator and id_operator._keep() != doc_id:
            msg = (
                "Performing an update on the path '_id' would "
                "modify the immutable field '_id'"
            )
            raise WriteError(msg, code=66)

    def update_one(
        self,
        filter,
        update,
        upsert=False,
        bypass_document_validation=False,
        array_filters=None,
        *args,
        **kwargs
    ):
        """ """
        validate_is_mapping("filter", filter)
        validate_ok_for_update(update)
        validate_list_or_none("array_filters", array_filters)
        validate_boolean("upsert", upsert)

        if bypass_document_validation:
            pass

        raw_result = {"n": 0, "nModified": 0}
        updator = Updator(update, array_filters)
        self._no_id_update(updator, filter)
        try:
            fw = next(self._internal_scan_query(filter))
        except StopIteration:
            if upsert:
                self._internal_upsert(filter, updator, raw_result)
        else:
            self._no_id_update(updator)

            raw_result["n"] = 1
            if updator(fw):
                self._storage.update_one(self, fw.doc)
                raw_result["nModified"] = 1

        return UpdateResult(raw_result)

    def update_many(
        self,
        filter,
        update,
        upsert=False,
        bypass_document_validation=False,
        array_filters=None,
        *args,
        **kwargs
    ):
        """ """
        validate_is_mapping("filter", filter)
        validate_ok_for_update(update)
        validate_list_or_none("array_filters", array_filters)
        validate_boolean("upsert", upsert)

        if bypass_document_validation:
            pass

        raw_result = {"n": 0, "nModified": 0}
        updator = Updator(update, array_filters)
        scanner = self._internal_scan_query(filter)
        self._no_id_update(updator, filter)
        try:
            next(scanner)
        except StopIteration:
            if upsert:
                self._internal_upsert(filter, updator, raw_result)
        else:
            self._no_id_update(updator)

            @on_err_close(scanner)
            def update_docs():
                n, m = 0, 0
                for fieldwalker in scanner:
                    n += 1
                    if updator(fieldwalker):
                        m += 1
                        yield fieldwalker.doc
                raw_result["n"] = n
                raw_result["nModified"] = m

            self._storage.update_many(self, update_docs())

        return UpdateResult(raw_result)

    def delete_one(self, filter):
        raw_result = {"n": 0}

        queryfilter = QueryFilter(filter)
        storage = self._storage
        documents = storage.query(MontyCursor(self), 0)

        for doc in documents:
            if queryfilter(doc):
                storage.delete_one(self, doc["_id"])
                raw_result["n"] = 1
                break

        return DeleteResult(raw_result)

    def delete_many(self, filter):
        raw_result = {"n": 0}

        queryfilter = QueryFilter(filter)
        storage = self._storage
        documents = storage.query(MontyCursor(self), 0)

        doc_ids = set()
        for doc in documents:
            if queryfilter(doc):
                doc_ids.add(doc["_id"])
                raw_result["n"] += 1

        storage.delete_many(self, doc_ids)

        return DeleteResult(raw_result)

    def find(self, *args, **kwargs):
        # return a cursor
        return MontyCursor(self, *args, **kwargs)

    def find_one(self, filter=None, *args, **kwargs):
        """ """
        if filter is not None and not isinstance(filter, abc.Mapping):
            filter = {"_id": filter}

        cursor = self.find(filter, *args, **kwargs)
        for result in cursor.limit(-1):
            return result
        return None

    def count(self, filter=None, **kwargs):
        warnings.warn(
            "count is deprecated. Use Collection.count_documents instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        return self.count_documents(filter, **kwargs)

    def count_documents(self, filter, **kwargs):
        cursor = MontyCursor(self, filter=filter, **kwargs)
        return len(list(cursor))

    def distinct(self, key, filter=None, **kwargs):
        """ """
        if not isinstance(key, string_types):
            raise TypeError(
                "key must be an instance of %s" % (string_types.__name__,)
            )

        result = list()

        def get_value(doc):
            fieldwalker = FieldWalker(doc)
            fieldvalues = fieldwalker.go(key).get().value
            res = list()
            for v in fieldvalues.iter_flat():
                weighted = Weighted(v)
                if weighted not in result:
                    res.append(weighted)
            return res

        documents = self._storage.query(MontyCursor(self), 0)

        if filter:
            queryfilter = QueryFilter(filter)
            for doc in documents:
                if queryfilter(doc):
                    result += get_value(doc)
        else:
            for doc in documents:
                result += get_value(doc)

        return [weighted.value for weighted in sorted(result)]

    def drop(self):
        self._database.drop_collection(self._name)

    def save(self, to_save, *args, **kwargs):
        # DEPRECATED
        if "_id" in to_save:
            self.replace_one(
                {"_id": to_save["_id"]}, to_save, upsert=True, *args, **kwargs
            )
        else:
            self.insert_one(to_save, *args, **kwargs)

    def create_index(self, *args, **kwargs):
        """Not functioning, currently only exists for mongoengine support.
        """