mwchase/class-namespaces

View on GitHub
src/class_namespaces/namespaces.py

Summary

Maintainability
C
1 day
Test Coverage
"""Class Namespaces (internal module).

All of the guts of the class namespace implementation.

"""


# See https://blog.ionelmc.ro/2015/02/09/understanding-python-metaclasses/

import collections.abc
import functools
import itertools
import weakref

from . import ops
from .descriptor_inspector import DescriptorInspector
from .flags import ENABLE_SET_NAME
from .proxy import _Proxy
from .scope_proxy import _ScopeProxy


_PROXY_INFOS = weakref.WeakKeyDictionary()


def _mro_to_chained(mro, dct):
    """Return a chained map of lookups for the given namespace and mro."""
    return collections.ChainMap(*[
        dct.get_namespace(cls, dct.path) for cls in
        itertools.takewhile(
            functools.partial(dct.no_blocker, dct.path),
            (cls for cls in mro if isinstance(cls, NamespaceableMeta))) if
        dct.namespace_exists(dct.path, cls)])


def _instance_map(ns_proxy):
    """Return a map, possibly chained, of lookups for the given instance."""
    dct, instance, _ = _PROXY_INFOS[ns_proxy]
    if instance is not None:
        try:
            if isinstance(instance, NamespaceableMeta):
                return _mro_to_chained(instance.__mro__, dct)
            return dct.get_namespace(instance, dct.path)
        except TypeError:
            return {}
    return {}


def _instance_namespace(instance, dct, name):
    """Return a Namespace associated with the instance, if possible."""
    try:
        return dct.get_namespace(instance, dct.path)
    except TypeError:
        raise AttributeError(name)


def _mro_map(ns_proxy):
    """Return a chained map of lookups for the given owner class."""
    dct, _, owner = _PROXY_INFOS[ns_proxy]
    mro = owner.__mro__
    parent_object = dct.parent_object
    index = mro.index(parent_object)
    mro = mro[index:]
    return _mro_to_chained(mro, dct)


def _retarget(ns_proxy):
    """Convert a class lookup to an instance lookup, if needed."""
    dct, instance, owner = _PROXY_INFOS[ns_proxy]
    if instance is None and isinstance(type(owner), NamespaceableMeta):
        instance, owner = owner, type(owner)
        dct = dct.get_namespace(owner, dct.path)
        ns_proxy = _NamespaceProxy(dct, instance, owner)
    return ns_proxy


class _NamespaceProxy(_Proxy):

    """Proxy object for manipulating and querying namespaces."""

    __slots__ = ('__weakref__',)

    def __init__(self, dct, instance, owner):
        _PROXY_INFOS[self] = dct, instance, owner

    def __dir__(self):
        return collections.ChainMap(_instance_map(self), _mro_map(self))

    def __getattribute__(self, name):
        self = _retarget(self)
        _, instance, owner = _PROXY_INFOS[self]
        instance_map = _instance_map(self)
        mro_map = _mro_map(self)
        instance_value = ops.get(instance_map, name)
        mro_value = ops.get(mro_map, name)
        if ops.is_data(mro_value) and ops.has_get(mro_value):
            return mro_value.get(instance, owner)
        if issubclass(owner, type) and ops.has_get(instance_value):
            return instance_value.get(None, instance)
        if instance_value is not None:
            return instance_value.object
        if ops.has_get(mro_value):
            return mro_value.get(instance, owner)
        if mro_value is not None:
            return mro_value.object
        raise AttributeError(name)

    def __setattr__(self, name, value):
        self = _retarget(self)
        dct, instance, owner = _PROXY_INFOS[self]
        if instance is None:
            real_map = dct.get_namespace(owner, dct.path)
            real_map[name] = value
            return
        mro_map = _mro_map(self)
        target_value = ops.get_data(mro_map, name)
        if target_value is not None:
            target_value.set(instance, value)
            return
        _instance_namespace(instance, dct, name)[name] = value

    def __delattr__(self, name):
        self = _retarget(self)
        dct, instance, owner = _PROXY_INFOS[self]
        real_map = dct.get_namespace(owner, dct.path)
        if instance is None:
            ops.delete(real_map, name)
            return
        value = ops.get_data(real_map, name)
        if value is not None:
            value.delete(instance)
            return
        ops.delete(_instance_namespace(instance, dct, name), name)


_NAMESPACE_INFOS = weakref.WeakKeyDictionary()


class Namespace(dict):

    """Namespace.

    Namespace() -> new empty namespace
    Namespace(mapping) -> new namespace initialized from a mapping object's
        (key, value) pairs
    Namespace(iterable) -> new namespace initialized as if via:
        d = {}
        for k, v in iterable:
            d[k] = v
        ns = Namespace(d)
    Namespace(**kwargs) -> new namespace initialized with the name=value pairs
        in the keyword argument list.  For example:  Namespace(one=1, two=2)

    Namespaces implement the context manager protocol. When the context is
    entered in an appropriate class creation scope, the Namespace shadows the
    currently visible scope.

    Namespaces can be re-entered after they are exited, provided they're in the
    same parent scope.
    """

    __slots__ = (
        'name', 'scope', 'parent', 'active', 'parent_object', 'needs_setup')

    __namespaces = {}

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        bad_values = tuple(
            value for value in self.values() if
            isinstance(value, (Namespace, _Proxy)))
        if bad_values:
            raise ValueError('Bad values: {}'.format(bad_values))
        self.name = None
        self.scope = None
        self.parent = None
        self.active = False
        self.parent_object = None
        self.needs_setup = False

    @classmethod
    def premake(cls, name, parent):
        """Return an empty namespace with the given name and parent."""
        self = cls()
        self.name = name
        self.parent = parent
        return self

    # Hold up. Do we need a symmetric addon to __delitem__?
    # I forget how this works.
    def __setitem__(self, key, value):
        if (
                self.scope is not None and
                isinstance(value, self.scope.scope_proxy)):
            value = self.scope.proxies[value]
        if isinstance(value, _Proxy):
            raise ValueError('Cannot move scopes between classes.')
        if isinstance(value, Namespace):
            value.push(key, self.scope, self)
            value.add(self.parent_object)
        super().__setitem__(key, value)

    def __enter__(self):
        self.needs_setup = True
        self.activate()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if self.name is None:
            raise RuntimeError('Namespace must be named.')
        self.deactivate()

    @property
    def path(self):
        """Return the full path of the namespace."""
        if self.name is None or self.parent is None:
            raise ValueError
        if isinstance(self.parent, Namespace):
            parent_path = self.parent.path
        else:
            parent_path = ()
        return parent_path + (self.name,)

    @classmethod
    def __get_helper(cls, target, path):
        """Return the namespace for `target` at `path`, create if needed."""
        if isinstance(target, NamespaceableMeta):
            path_ = target, path
            namespaces = cls.__namespaces
        else:
            path_ = path
            try:
                namespaces = _NAMESPACE_INFOS[target]
            except KeyError:
                namespaces = {}
                _NAMESPACE_INFOS[target] = namespaces
        return path_, namespaces

    @classmethod
    def namespace_exists(cls, path, target):
        """Return whether the given namespace exists."""
        path_, namespaces = cls.__get_helper(target, path)
        return path_ in namespaces

    @classmethod
    def get_namespace(cls, target, path):
        """Return a namespace with given target and path, create if needed."""
        path_, namespaces = cls.__get_helper(target, path)
        try:
            return namespaces[path_]
        except KeyError:
            if len(path) == 1:
                parent = {}
            else:
                parent = cls.get_namespace(target, path[:-1])
            return namespaces.setdefault(path_, cls.premake(path[-1], parent))

    @classmethod
    def no_blocker(cls, path, cls_):
        """Return False if there's a non-Namespace object in the path."""
        try:
            namespace = vars(cls_)
            for name in path:
                namespace = namespace[name]
                if not isinstance(namespace, cls):
                    return False
        except KeyError:
            pass
        return True

    def add(self, target):
        """Add self as a namespace under target."""
        if target is not None:
            path, namespaces = self.__get_helper(target, self.path)
            res = namespaces.setdefault(path, self)
            if res is self:
                self.parent_object = target

    def set_if_none(self, name, value):
        """Set the attribute `name` to `value`, if it's initially None."""
        if getattr(self, name) is None:
            setattr(self, name, value)

    def validate_assignment(self, name, scope):
        """Confirm the instance can be assigned to the given name and scope."""
        if name != self.name:
            raise ValueError('Cannot rename namespace')
        if scope is not self.scope:
            raise ValueError('Cannot reuse namespace')

    def validate_parent(self, parent):
        """Confirm that the instance has the correct parent dict."""
        if parent is not self.parent:
            raise ValueError('Cannot reparent namespace')

    def push(self, name, scope, parent):
        """Bind self to the given name and scope, and activate."""
        self.set_if_none('name', name)
        self.set_if_none('scope', scope)
        self.set_if_none('parent', parent)
        self.validate_assignment(name, scope)
        self.validate_parent(parent)
        self.scope.namespaces.append(self)
        if self.needs_setup:
            self.activate()

    def activate(self):
        """Take over as the scope for the target."""
        if self.active:
            raise ValueError('Cannot double-activate.')
        if self.scope is not None:
            self.validate_parent(self.scope.head)
            self.active = True
            self.scope.push(self)
            self.needs_setup = False

    def deactivate(self):
        """Stop being the scope for the target."""
        if self.scope is not None and self.active:
            self.active = False
            self.scope.pop_()

    def total_length(self):
        """Return the total number of elements in the Namespace tree."""
        return len(self) + sum(
            ns.total_length() for ns in self.values() if
            isinstance(ns, Namespace))

    def iter_all(self, prefix):
        """Iterate over all elements in the Namespace tree."""
        for key, value in self.items():
            qualified_name = prefix + key
            yield qualified_name
            if isinstance(value, Namespace):
                yield from value.iter_all(qualified_name + '.')

    def __get__(self, instance, owner):
        return _NamespaceProxy(self, instance, owner)

    @staticmethod
    def __bool__():
        return True


class _NamespaceScope(collections.abc.MutableMapping):

    """The class creation namespace for NamespaceableMetas."""

    __slots__ = 'dicts', 'namespaces', 'proxies', 'scope_proxy', 'finalized'

    def __init__(self, dct):
        self.dicts = [dct]
        self.namespaces = []
        self.proxies = weakref.WeakKeyDictionary()
        self.finalized = False

        self_ = self

        class ScopeProxy(_ScopeProxy):

            """Local version of ScopeProxy for this scope."""

            __slots__ = ()

            def __init__(self, dct):
                super().__init__(dct, self_)

        self.scope_proxy = ScopeProxy

    @property
    def head(self):
        """Return the innermost Namespace scope."""
        return self.dicts[0]

    def finalize(self):
        """Mark the scope as no longer active, and return the head."""
        if len(self.dicts) != 1:
            raise ValueError('Cannot finalize a pushed scope!')
        self.finalized = True
        return self.head

    def push(self, dct):
        """Add a new active Namespace to the scope."""
        if self.finalized:
            raise ValueError('Cannot push a finalized scope!')
        self.dicts.insert(0, dct)

    def pop_(self):
        """Remove the current active Namespace from the scope."""
        if len(self.dicts) == 1:
            raise ValueError('Cannot pop from a basal scope!')
        self.dicts.pop(0)

    def _raw_get(self, parent, key):
        """Return the item under the given path, without wrapping."""
        dct = self.head
        try:
            for element in parent.split('.'):
                dct = dct[element]
                if not isinstance(dct, Namespace):
                    raise KeyError
            return dct
        except KeyError:
            raise KeyError(key)

    def wrap(self, value):
        """If `value` is a Namespace, wrap it in a proxy."""
        if isinstance(value, Namespace):
            value = self.scope_proxy(value)
        return value

    def __getitem__(self, key):
        if self.finalized:
            parent, is_namespace, name = key.rpartition('.')
            if is_namespace:
                namespace = self._raw_get(parent, key)
                try:
                    value = namespace[name]
                except KeyError:
                    raise KeyError(key)
            else:
                value = self.head[key]
        else:
            value = collections.ChainMap(*self.dicts)[key]
        return self.wrap(value)

    def _store(self, key, value, dct):
        """Return the rebased value and target dict."""
        # We just entered the context successfully.
        if not self.finalized:
            if value is dct:
                dct = self.dicts[1]
            if isinstance(value, Namespace):
                value.push(key, self, dct)
        if isinstance(value, self.scope_proxy):
            value = self.proxies[value]
            value.validate_parent(dct)
            value.validate_assignment(key, self)
        return value, dct

    def __setitem__(self, key, value):
        dct = self.head
        value, dct = self._store(key, value, dct)
        if isinstance(value, _Proxy):
            raise ValueError('Cannot move scopes between classes.')
        parent, is_namespace, name = key.rpartition('.')
        if self.finalized and is_namespace:
            # Look for parent, not key.
            namespace = self._raw_get(parent, parent)
            namespace[name] = value
        else:
            dct[key] = value

    def __delitem__(self, key):
        parent, is_namespace, name = key.rpartition('.')
        if self.finalized and is_namespace:
            # Look for parent, not key.
            namespace = self._raw_get(parent, parent)
            del namespace[name]
        else:
            del self.head[key]

    def __iter__(self):
        if not self.finalized:
            raise ValueError('Iteration not defined on unfinalized scope.')
        for key, value in self.head.items():
            yield key
            if isinstance(value, Namespace):
                yield from value.iter_all(prefix=key + '.')

    def __len__(self):
        if not self.finalized:
            raise ValueError('Length not defined on unfinalized scope.')
        return len(self.head) + sum(
            ns.total_length() for ns in self.head.values() if
            isinstance(ns, Namespace))


_NAMESPACE_SCOPES = weakref.WeakKeyDictionary()


class NamespaceableMeta(type):

    """Metaclass for classes that can contain namespaces.

    A note for people extending the functionality:
    The base class for NamespaceableMeta uses a non-standard super() invocation
    in its definitions of several methods. This was the only way I could find
    to mitigate some bugs I encountered with a standard invocation. If you
    override any of methods defined on built-in types, I recommend this form
    for maximal reusability:

    super(class, type(self)).__method__(self, ...)
    """

    @classmethod
    def __prepare__(mcs, name, bases, **kwargs):
        return _NamespaceScope(super().__prepare__(name, bases, **kwargs))

    def __new__(mcs, name, bases, dct, **kwargs):
        cls = super().__new__(mcs, name, bases, dct.finalize(), **kwargs)
        _NAMESPACE_SCOPES[cls] = dct
        for namespace in dct.namespaces:
            namespace.add(cls)
            if ENABLE_SET_NAME:
                for name_, value in namespace.items():
                    wrapped = DescriptorInspector(value)
                    if wrapped.has_set_name:
                        wrapped.set_name(cls, name_)
        return cls

    @staticmethod
    def __is_proxy(value):
        """Return whether the value is a _NamespaceProxy."""
        if not isinstance(value, _NamespaceProxy):
            raise ValueError('Given a dot attribute that went too deep.')
        return value

    def __getattribute__(cls, name):
        parent, is_namespace, name_ = name.rpartition('.')
        if is_namespace:
            cls_ = cls
            for element in parent.split('.'):
                cls_ = cls.__is_proxy(getattr(cls_, element))
            return getattr(cls_, name_)
        return super(NamespaceableMeta, type(cls)).__getattribute__(cls, name)

    def __setattr__(cls, name, value):
        if (
                '.' not in name and isinstance(value, Namespace) and
                value.name != name):
            scope = _NAMESPACE_SCOPES[cls]
            value.push(name, scope, scope.head)
            value.add(cls)
        parent, is_namespace, name_ = name.rpartition('.')
        if is_namespace:
            setattr(cls.__is_proxy(getattr(cls, parent)), name_, value)
            return
        super(NamespaceableMeta, type(cls)).__setattr__(cls, name, value)

    def __delattr__(cls, name):
        parent, is_namespace, name_ = name.rpartition('.')
        if is_namespace:
            delattr(cls.__is_proxy(getattr(cls, parent)), name_)
            return
        super(NamespaceableMeta, type(cls)).__delattr__(cls, name)


class Namespaceable(metaclass=NamespaceableMeta):

    """Optional convenience class. Inherit from it to get the metaclass."""