IRC-SPHERE/HyperStream

View on GitHub
hyperstream/utils/containers.py

Summary

Maintainability
D
2 days
Test Coverage
# The MIT License (MIT) # Copyright (c) 2014-2017 University of Bristol
#
#  Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to deal
#  in the Software without restriction, including without limitation the rights
#  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
#  copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
#  The above copyright notice and this permission notice shall be included in all
#  copies or substantial portions of the Software.
#
#  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
#  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
#  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
#  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
#  DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
#  OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
#  OR OTHER DEALINGS IN THE SOFTWARE.

from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals

from .errors import StreamNotFoundError
from .time_utils import json_serial

from datetime import datetime
import logging
import sys
from bidict import bidict, ValueDuplicationError
from future.utils import python_2_unicode_compatible
from treelib.tree import Tree, NodePropertyAbsentError, NodeIDAbsentError
import json


# To restore default stdout and stderr channels after reloading sys
# source: https://github.com/ipython/ipython/issues/8354
# Backing up references to the current stdout and stderr
default_stdout = sys.stdout
default_stderr = sys.stderr

# The next two lines are to fix the "UnicodeDecodeError: 'ascii' codec can't
# decode byte" error
# http://stackoverflow.com/questions/21129020/how-to-fix-unicodedecodeerror-ascii-codec-cant-decode-byte
try:
    reload(sys)  # Python 2.7
    # noinspection PyUnresolvedReferences
    sys.setdefaultencoding('utf8')
except NameError:
    pass

# Restoring references to the previous stdout and stderr
sys.stdout = default_stdout
sys.stderr = default_stderr


@python_2_unicode_compatible
class MetaDataTree(Tree):
    (ROOT, DEPTH, WIDTH, ZIGZAG) = list(range(4))

    def __str__(self):
        self.reader = u"\n"

        def write(line):
            self.reader += line.decode('utf-8') + "\n"

        self._Tree__print_backend(func=write, idhidden=False)
        return self.reader

    def __repr__(self):
        return "{} with {} nodes and depth {}".format(self.__class__.__name__, len(self.nodes), self.tree_depth)

    @property
    def tree_depth(self):
        return max(self.depth(n) for n in self.nodes)

    # noinspection PyPep8Naming,SpellCheckingInspection
    def _Tree__print_backend(self, nid=None, level=ROOT, idhidden=True, queue_filter=None,
                             key=None, reverse=False, line_type='ascii-emv',
                             data_property=None, func=print, iflast=None):
        """
        Another implementation of printing tree using Stack
        Print tree structure in hierarchy style.
        For example:
            Root
            |___ C01
            |    |___ C11
            |         |___ C111
            |         |___ C112
            |___ C02
            |___ C03
            |    |___ C31
        A more elegant way to achieve this function using Stack
        structure, for constructing the Nodes Stack push and pop nodes
        with additional level info.
        UPDATE: the @key @reverse is present to sort node at each
        level.
        """
        line_types = \
            {'ascii': ('|', '|-- ', '+-- '),
             'ascii-ex': ('\u2502', '\u251c\u2500\u2500 ', '\u2514\u2500\u2500 '),
             'ascii-exr': ('\u2502', '\u251c\u2500\u2500 ', '\u2570\u2500\u2500 '),
             'ascii-em': ('\u2551', '\u2560\u2550\u2550 ', '\u255a\u2550\u2550 '),
             'ascii-emv': ('\u2551', '\u255f\u2500\u2500 ', '\u2559\u2500\u2500 '),
             'ascii-emh': ('\u2502', '\u255e\u2550\u2550 ', '\u2558\u2550\u2550 ')}
        DT_VLINE, DT_LINE_BOX, DT_LINE_COR = line_types[line_type]

        nid = self.root if (nid is None) else nid
        if not self.contains(nid):
            raise NodeIDAbsentError("Node '%s' is not in the tree" % nid)

        if data_property is not None and hasattr(self[nid].data, data_property):
            display_value = getattr(self[nid].data, data_property)
        elif data_property is None:
            display_value = self[nid].tag
        else:
            raise NodePropertyAbsentError("Node '%s' does not have data property '%s'" % (nid, data_property))

        label = ('{0}'.format(display_value)) \
            if idhidden \
            else ('{0}[{1}:{2}]'.format(display_value, self[nid].identifier, str(self[nid].data)))

        queue_filter = self._Tree__real_true if (queue_filter is None) else queue_filter

        if not iflast:
            iflast = []

        if level == self.ROOT:
            func(label.encode('utf8'))
        else:
            leading = ''.join(map(lambda x: DT_VLINE + ' ' * 3 if not x else ' ' * 4, iflast[0:-1]))
            lasting = DT_LINE_COR if iflast[-1] else DT_LINE_BOX
            func('{0}{1}{2}'.format(leading, lasting, label).encode('utf-8'))

        if queue_filter(self[nid]) and self[nid].expanded:
            queue = [self[i] for i in self[nid].fpointer if queue_filter(self[i])]
            key = (lambda x: x) if (key is None) else key
            queue.sort(key=key, reverse=reverse)
            level += 1
            for element in queue:
                iflast.append(queue.index(element) == len(queue) - 1)
                self._Tree__print_backend(element.identifier, level, idhidden,
                                          queue_filter, key, reverse, line_type, data_property, func, iflast)
                iflast.pop()


@python_2_unicode_compatible
class Printable(object):
    """
    A base class for default printing
    """
    def __str__(self):
        # pp = pprint.PrettyPrinter(indent=4)
        # return pp.pformat({self.__class__.__name__: self.__dict__})
        return repr(self)

    def __repr__(self):
        name = self.__class__.__name__
        values = ", ".join("{}={}".format(k, self.format(v)) for k, v in sorted(self.__dict__.items()) if k[0] != "_")
        return "{}({})".format(name, values)

    @staticmethod
    def format(v):
        if isinstance(v, datetime):
            if v.tzname() == 'UTC':
                return 'datetime.datetime({}, {}, {}, {}, {}, {}, {}, tzinfo={}'.format(
                    v.year, v.month, v.day, v.hour, v.minute, v.second, v.microsecond, 'UTC'
                )
            else:
                return repr(v)
        else:
            return repr(v)


class Hashable(object):
    _name = None
    """
    A base class that creates hashes based on the __dict__
    Requires keys to be strings to work properly. It will first try to use json.dumps, but if that fails because one of
    the values is not json serializable (e.g. datetime.datetime) then it will fall back on repr
    """
    @property
    def name(self):
        return self._name if self._name is not None else self.__class__.__name__

    @name.setter
    def name(self, name):
        self._name = str(name)

    def __hash__(self):
        try:
            return hash((self.name, json.dumps(self.__dict__, sort_keys=True, default=json_serial)))
        except TypeError:
            return hash((self.name, repr(sorted(self.__dict__.items()))))


class TypedBiDict(Printable):
    """
    Custom strongly typed bi-directional dictionary where keys and values must be a specific type.
    Raises ValueDuplicationError if the same value is added again
    """

    def __init__(self, key_type, value_type, *args, **kwargs):
        if not isinstance(key_type, type):
            raise ValueError("expected type, got {}", type(key_type))
        if not isinstance(value_type, type):
            raise ValueError("expected type, got {}", type(value_type))

        self._store = bidict(*args, **kwargs)
        self.key_type = key_type
        self.value_type = value_type

    def __repr__(self):
        return "{}(key_type={}, value_type={})".format(
            self.__class__.__name__,
            repr(self.key_type),
            repr(self.value_type))

    def __iter__(self):
        return iter(self._store)

    def __len__(self):
        return len(self._store)

    def __getitem__(self, key):
        if not isinstance(key, self.key_type):
            raise TypeError("expected {}, got {}".format(self.key_type, type(key)))
        try:
            return self._store[key]
        except KeyError:
            # for debugging
            raise StreamNotFoundError(repr(key))

    def __setitem__(self, key, value):
        if not isinstance(key, self.key_type):
            raise TypeError("expected {}, got {}".format(self.key_type, type(key)))
        if not isinstance(value, self.value_type):
            raise ValueError("expected {}, got {}".format(self.value_type, type(value)))
        try:
            self._store[key] = value
        except ValueDuplicationError as e:
            # TODO: debugging
            raise e

    def __delitem__(self, key):
        self._store.__delitem__(key)
        # del self._store[key]

    def __contains__(self, item):
        return item in self._store

    def keys(self):
        return self._store.keys()

    def values(self):
        return self._store.values()

    def items(self):
        return self._store.items()

    def iterkeys(self):
        return self._store.iterkeys()

    def itervalues(self):
        return self._store.itervalues()

    def iteritems(self):
        return self._store.iteritems()


class FrozenKeyDict(dict):
    def __setitem__(self, key, value):
        if key in self:
            # Try to reconcile the new value with the old
            old = self[key]
            if isinstance(value, dict) and isinstance(old, dict):
                for k in value:
                    if k in old:
                        try:
                            if value[k] != old[k]:
                                raise KeyError(
                                    "Key {} has already been set with value {}, new value {}"
                                    .format(key, self[key], value))
                        except ValueError:
                            try:
                                if not all(map(lambda x: x[0] == x[1], zip(value[k], old[k]))):
                                    raise KeyError(
                                        "Key {} has already been set with value {}, new value {}"
                                        .format(key, self[key], value))
                            except ValueError as e:
                                logging.error('Unable to compare values for key {}:'
                                              ' old {}, new {}, error {}, overwriting'
                                              .format(key, self[key], value, e))
                                self[key][k] = value[k]
                        continue
                    self[key][k] = value[k]
            else:
                if self[key] != value:
                    raise KeyError("Key {} has already been set with value {}, new value {}".format(
                                   key, self[key], value))
            return
        super(FrozenKeyDict, self).__setitem__(key, value)


class TypedFrozenKeyDict(FrozenKeyDict):
    def __init__(self, key_type, *args, **kwargs):
        if not isinstance(key_type, type):
            raise ValueError("Expected type, got {}".format(type(key_type)))
        self.key_type = key_type
        super(TypedFrozenKeyDict, self).__init__(*args, **kwargs)

    def __setitem__(self, key, value):
        if not isinstance(key, self.key_type):
            raise KeyError("Expected type {}, got {}".format(self.key_type, type(key)))
        super(TypedFrozenKeyDict, self).__setitem__(key, value)


class _Singleton(type):
    """
    Singleton class, py2k and p3k compatible
    See:
        https://stackoverflow.com/a/33201/1038264
        https://stackoverflow.com/questions/6760685/creating-a-singleton-in-python
    """
    # noinspection PyInitNewSignature
    def __init__(cls, name, bases, dictionary):
        super(_Singleton, cls).__init__(name, bases, dictionary)
        cls.instance = None

    def __call__(cls, *args, **kwargs):
        if cls.instance is None:
            cls.instance = super(_Singleton, cls).__call__(*args, **kwargs)
        return cls.instance


class Singleton(type):
    # noinspection PyInitNewSignature
    def __init__(cls, name, bases, dictionary):
        super(Singleton, cls).__init__(name, bases, dictionary)
        cls._instance = None

    def __call__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instance


class ToolContainer(Printable):
    """
    Dummy class for holding tool objects for easy access
    """
    pass


class FactorContainer(Printable):
    """
    Dummy class for holding factor creation functions
    """


class PluginContainer(Printable):
    """
    Dummy class for holding plugins
    """


class PluginWrapper(Printable):
    """
    Dummy class for a plugins containing tool objects for easy access
    """
    def __init__(self):
        self.tools = ToolContainer()
        self.factors = FactorContainer()