sonntagsgesicht/unicum

View on GitHub
unicum/datarange.py

Summary

Maintainability
D
2 days
Test Coverage
# -*- coding: utf-8 -*-

# unicum
# ------
# Python library for simple object cache and factory.
# 
# Author:   sonntagsgesicht, based on a fork of Deutsche Postbank [pbrisk]
# Version:  0.3, copyright Wednesday, 18 September 2019
# Website:  https://github.com/sonntagsgesicht/unicum
# License:  Apache License 2.0 (see LICENSE file)


LEFT_TOP = None


class DataRange(object):
    def __init__(self, iterable=None, value_types=(float, int, str, type(None)), none_alias=(None, ' ', '', 'None'),
                 **kwargs):
        self._value_types = value_types
        self._none_alias = none_alias
        self._col_keys = list()
        self._row_keys = list()

        # convert dict into nested list
        if isinstance(iterable, dict):
            iterable = DataRange.__dict_to_nested_list(iterable)

        # convert DataRange into nested list
        if isinstance(iterable, DataRange):
            iterable = iterable.to_serializable()

        # replace None alias by None
        none_alias = none_alias if isinstance(none_alias, (tuple, list)) else [none_alias]
        if iterable:
            f = (lambda x: None if x in none_alias else x)
            iterable = [[f(c) for c in r] for r in iterable]

        # slice nested list iterable into (column headers, row headers, data)
        col_keys, row_keys, item_list = DataRange.__slice_nested_list(iterable)

        # validate iterable (only admissible value types)
        for row_value in item_list:
            for value in row_value:
                self._validate_value(value)

        # validate column header entries for ambiguity (no int)
        # if any(isinstance(col_key, int) for col_key in col_keys):
        #    raise KeyError('Column keys must not be of type int.')

        # validate row header entries for ambiguity (no int < len(row_headers))
        # if any(isinstance(row_key, int) for row_key in row_keys):
        #    raise KeyError('If row keys are of type int, values not must between 0 and %i.' % len(row_keys))

        if not len(col_keys) == len(set(col_keys)):
            raise KeyError('Col keys must be unique.')
        if not len(row_keys) == len(set(row_keys)):
            raise KeyError('Row keys must be unique.')

        self._col_keys = col_keys
        self._row_keys = row_keys
        self._item_list = item_list

        iterable = list()
        for row_key, row_values in zip(row_keys, item_list):
            for col_key, value in zip(col_keys, row_values):
                iterable.append(((row_key, col_key), value))

        # super(DataRange, self).__init__(iterable, **kwargs)
        self._dict = dict(iterable, **kwargs)
        self._hash = hash(self)

    @staticmethod
    def __dict_to_nested_list(iterable):
        item_list = [list(iterable.keys())]
        for i in range(max(list(map(len, list(iterable.values()))))):
            item_list.append([iterable[k][i] for k in item_list[0]])
        return item_list

    @staticmethod
    def __slice_nested_list(iterable):
        if not iterable:
            return list(), list(), list()

        # extract column headers from first row
        col_header = iterable.pop(0)
        if not len(set(col_header)) == len(col_header):
            raise ValueError('All column header entries must be unique.')

        # extract row headers if given
        if col_header.count(None):
            i = col_header.index(None)
            col_header.pop(i)
            row_header = [row.pop(i) for row in iterable]
        else:
            row_header = list(range(len(iterable)))
            if not len(set(row_header)) == len(row_header):
                raise ValueError('All row header entries must be unique.')

        return col_header, row_header, iterable

    def _validate_value(self, value):
        if not isinstance(value, self._value_types):
            s = ', '.join([str(t) for t in self._value_types])
            t = type(value)

            msg = 'All properties of item in this DataRange must have type ' \
                  'of either one of %s but not %s.' % (s, t)
            raise TypeError(msg)

        return True

    def _validate_key_format(self, key):
        if not isinstance(key, (list, tuple)) or not len(key) == 2:
            s = self.__class__.__name__, key.__class__.__name__
            raise KeyError('Key of %s must be (row_key, col_key) tuple not %s.' % s)

    def _validate_key_existence(self, key):
        self._validate_key_format(key)
        r, c = key
        if r not in self._row_keys or c not in self._col_keys:
            s = self.__class__.__name__, str(self._row_keys), str(self._col_keys)
            msg = 'Key of %s must be (row_key, col_key) tuple with \n row_key in %s \n and col_key in %s' % s
            raise KeyError(msg)
        return True

    def __hash__(self):
        return hash(frozenset(list(self._dict.items())))

    def __repr__(self):
        return self.__class__.__name__ + '(%s)' % str(id(self))

    def __str__(self):
        return self.__class__.__name__ + '(%s)' % str(self.to_serializable())

    def __len__(self):
        return len(self._row_keys)

    def __eq__(self, other):
        if not isinstance(other, DataRange):
            raise TypeError('Cannot compare %s with %s.' % (self.__class__.__name__, other.__class__.__name__))
        return self.total_list == other.total_list

    def __contains__(self, key):
        try:
            self._validate_key_existence(key)
        except KeyError:
            return False
        return True

    def __copy__(self):
        return self.__class__(self.total_list)

    def __deepcopy__(self, memo):
        return self.__class__(self.to_serializable())

    def __setitem__(self, key, value):
        self._validate_key_format(key)
        self._validate_value(value)
        self._dict[key] = value
        self._update_cache()

    def __getitem__(self, key):
        # redirect slice
        if isinstance(key, slice):
            return self.__getslice__(key.start, key.stop)
        self._validate_key_format(key)
        self._validate_key_existence(key)
        return self._dict.get(key, None)

    def __delitem__(self, key):
        raise NotImplementedError

    def __setslice__(self, i, j, sequence):
        raise NotImplementedError

    def __getslice__(self, i, j):
        self._update_cache()
        if not isinstance(i, (tuple, list)):
            l = len(self._col_keys)
            return self[(i, 0):(j, l)]

        ri, ci = i
        rj, cj = j
        ri = self._row_keys.index(ri) if self._row_keys.count(ri) else ri
        rj = self._row_keys.index(rj) + 1 if self._row_keys.count(rj) else rj
        ci = self._col_keys.index(ci) if self._col_keys.count(ci) else ci
        cj = self._col_keys.index(cj) + 1 if self._col_keys.count(cj) else cj
        row_keys = self._row_keys[ri:rj]
        col_keys = self._col_keys[ci:cj]
        ret = list()
        for r in row_keys:
            row = [self[r, c] for c in col_keys]
            ret.append(row)
        return ret

    def __delslice__(self, i, j):
        raise NotImplementedError

    def update(self, other):
        r = self._dict.update(other)
        self._update_cache()
        return r

    def keys(self):
        return list(self._dict.keys())

    def values(self):
        return list(self._dict.values())

    def items(self):
        return list(self._dict.items())

    def get(self, key, default=None):
        return self._dict.get(key, default)

    def pop(self, key, default=None):
        r = self._dict.pop(key, default)
        self._update_cache()
        return r

    def popitem(self):
        raise NotImplementedError

    def row_append(self, row_key, value_list):
        """
        append a new row to a DataRange

        :param row_key: a string
        :param value_list: a list
        """
        if row_key in self._row_keys:
            raise KeyError('Key %s already exists in row keys.' % row_key)
        if not len(value_list) == len(self._col_keys):
            raise ValueError('Length of data to set does not meet expected row length of %i' % len(self._col_keys))
        self.update(list(zip(list(zip([row_key]*len(self._col_keys), self._col_keys)), value_list)))

    def col_append(self, col_key, value_list):
        """
        append a new row to a DataRange

        :param row_key: a string
        :param value_list: a list
        """
        if col_key in self._col_keys:
            raise KeyError('Key %s already exists col keys.' % col_key)
        if not isinstance(value_list, (tuple, list)):
            value_list = [value_list] * len(self._row_keys)
        if not len(value_list) == len(self._row_keys):
            raise ValueError('Length of data to set does not meet expected col length of %i' % len(self._row_keys))
        self.update(list(zip(list(zip(self._row_keys, [col_key]*len(self._row_keys))), value_list)))

    def _update_cache(self):
        if not self._hash == hash(self):
            row_keys = sorted(list(set([row_key for row_key, col_key in list(self.keys())
                                        if row_key not in self._row_keys])))
            self._row_keys += row_keys

            col_keys = sorted(list(set([col_key for row_key, col_key in list(self.keys())
                                        if col_key not in self._col_keys])))
            self._col_keys += col_keys

            self._item_list = [[self.get((r, c)) for c in self._col_keys] for r in self._row_keys]
            self._hash = hash(self)

    # @property
    def row_keys(self):
        return self._row_keys

    # @property
    def col_keys(self):
        return self._col_keys

    def row(self, item):
        i = self.row_keys().index(item)
        return self.item_list[i]

    def col(self, item):
        j = self.col_keys().index(item)
        return [row[j] for row in self.item_list]

    @property
    def item_list(self):
        return self._item_list

    @property
    def total_list(self):
        if not self:
            return [[]]
        total = [[LEFT_TOP] + self.col_keys()]
        for r, row in zip(self.row_keys(), self.item_list):
            total.append([r] + row)
        return total

    def to_serializable(self, level=0, all_properties_flag=False, recursive=True):
        ret = list()
        for r in self.total_list:
            l = list()
            for v in r:
                if recursive:
                    v = v if not hasattr(v, 'to_serializable') else v.to_serializable(level + 1, all_properties_flag)
                    v = v if isinstance(v, (float, int, type(None))) else str(v)
                v = self._none_alias[0] if isinstance(v, type(None)) else v
                l.append(v)
            ret.append(l)
        return ret

    def transpose(self):
        return self.__class__(list(zip(*self.total_list)), value_types=self._value_types, none_alias=self._none_alias)

    def append(self, key, value):
        self.row_append(key, value)

    def extend(self, other):
        raise NotImplementedError

    def insert(self, item, key, value=None):
        raise NotImplementedError

    def __reduce__(self):

        return self.__class__, (self.total_list, self._value_types, self._none_alias)