niccokunzmann/ObservableList

View on GitHub
ObservableList/test/test_observable_list.py

Summary

Maintainability
B
5 hrs
Test Coverage
"""Test the observable list.

The :class:`knittingpattern.ObservableList.ObservableList` is a
:class:`list` implementing the observer pattern.
This way, a row can be notifies abot the change in its instructions.
"""

from pytest import fixture
import pytest
from unittest.mock import MagicMock
from ObservableList import ObservableList
from weakref import WeakKeyDictionary
import sys
from functools import wraps
import traceback

PY2 = sys.version_info[0] == 2


def python3(function):
    """Annotate the test to only execute it in Python 3."""
    return pytest.mark.skipif(PY2, reason="requires python3")(function)


@fixture
def changes():
    """A list of changes made."""
    return []


@fixture
def onchange(changes):
    """The fucntion to change if a change happens."""
    def add_change(change):
        changes.append((change, change.elements[:]))
    return add_change


@fixture
def ol(onchange):
    """The observable list."""
    ol_ = ObservableList()
    ol_.register_observer(onchange)
    return ol_


class OtherSelf(object):

    """Class for other selves."""

    def call(self, attr):
        def call(*args, **kw):
            before_list = self.real_list[:]
            before_observable_list = self.observable_list[:]
            before_changes = self.changes[:]
            observable_value = call_attr(self.observable_list, attr, args, kw)
            real_value = call_attr(self.real_list, attr, args, kw)
            return StepTester((self.real_list, self.observable_list),
                              (before_list, before_observable_list),
                              (real_value, observable_value),
                              (before_changes, self.changes))
        return call

other_selves = WeakKeyDictionary()


def other_self(func):
    """Wrapper for functions that prefer an other self."""
    @wraps(func)
    def wrapper(self, *args, **kw):
        other_self = other_selves.get(self, OtherSelf())
        other_selves.setdefault(self, other_self)
        return func(other_self, *args, **kw)
    return wrapper


def call_attr(obj, attr, args, kw):
    """Calls an object s attribute and returns the result and the exception.

    :param obj: the object to call the method on
    :param: attr: the attribute to get the coallable from
    :param tuple args: a tuple of arguments. If elements of :paramref:`args`
      are callable, their evaluation result becones the argument. This is
      useful if you pass a mutable object i.e. a generator.
    :param dict kw: the keyword arguments
    """
    args_ = []
    for arg in args:
        if callable(arg):
            args_.append(arg())
        else:
            args_.append(arg)
    try:
        return getattr(obj, attr)(*args_, **kw), None
    except:
        ty, err, tb = sys.exc_info()
        traceback.print_exception(ty, err, tb)
        return None, err


PRECONDITION = "Observable and real object should start as the same."
OBSERVER_IS_NOTIFIED = "The observer must be notified. "\
    "You probably forgot the call \"notify_observers(change)\"."
ERROR_SAME = "error is the same"


class StepTester(object):

    def __init__(self, real, before, value, changes):
        self.real, self.observable = real
        self.real_before, self.observable_before = before
        self.value, self.observable_value = value
        self.changes_before, self.changes_after = changes
        self.assert_same()

    @property
    def last_change(self):
        """The last change."""
        return self.changes_after[-1][0]

    def assert_same(self):
        assert self.real_before == self.observable_before, PRECONDITION
        assert self.real == self.observable
        for i, element in enumerate(self.real):
            assert element == self.observable[i]
        assert all(a == b for a, b in zip(self.real, self.observable))
        assert len(self.real) == len(self.observable)
        for element in self.real:
            assert self.real.count(element) == self.observable.count(element)
        real_is_same = self.real_before == self.real
        observable_is_same = self.observable == self.observable_before
        assert real_is_same == observable_is_same
        value, error = self.value
        observable_value, observable_error = self.observable_value
        assert value == observable_value, "return values are the same"
        assert error.__class__ == observable_error.__class__, ERROR_SAME
        if error is not None:
            assert error.args == observable_error.args, ERROR_SAME
        real_is_same = self.real_before is self.real
        observable_is_same = self.observable_before is self.observable
        assert real_is_same == observable_is_same, "identity of result is same"

    def assert_no_change(self):
        assert self.changes_before == self.changes_after

    def assert_one_more_change(self, changes=1):
        assert len(self.changes_before) == len(self.changes_after) - changes, \
            OBSERVER_IS_NOTIFIED

    def assert_add(self, index, elements):
        self.assert_one_more_change()
        self._assert_add(index, elements)

    def _assert_add(self, index, elements):
        self._assert_change(self.changes_after[-1], index, elements)
        for obj in (self.real, self.observable):
            self._assert_change_adds(obj, index, elements)

    def _assert_change(self, change_and_elements, index, elements, adds=True):
        change, change_elements = change_and_elements
        length = len(elements)
        if isinstance(index, int):
            slice_ = slice(index, index + length, 1)
        elif isinstance(index, slice):
            slice_ = index
        else:
            raise TypeError("Expected int or slice but not {}".format(index))
        assert change.adds() == adds
        assert change.removes() != adds
        assert change.start == slice_.start
        assert change.stop == slice_.stop
        assert change.length == length
        if adds:
            range_ = range(*slice_.indices(len(self.real)))
        else:
            range_ = range(*slice_.indices(len(self.real_before)))
        assert change.range == range_
        assert change.changed_object is self.observable
        assert change_elements == elements

    def _assert_change_adds(self, obj, index, elements):
        for element in elements:
            assert element in obj
        for i, element in enumerate(elements, index):
            assert obj[i] == element
        assert obj[index:index + len(elements)] == elements

    def assert_remove(self, index, elements):
        self.assert_one_more_change()
        self._assert_change(self.changes_after[-1], index, elements,
                            adds=False)

    def assert_replace(self, index, old_elements, new_elements):
        self.assert_one_more_change(2)
        self._assert_change(self.changes_after[-2], index, old_elements,
                            adds=False)
        self._assert_add(index, new_elements)


class ObservableChain(object):

    @other_self
    def __init__(self, real_list, observable_list, changes):
        self.real_list = real_list
        self.observable_list = observable_list
        self.changes = changes

    @other_self
    def __getattribute__(self, attr):
        if hasattr(self, attr):
            return getattr(self, attr)
        return self.call(attr)


@fixture
def chain(ol, changes):
    return ObservableChain([], ol, changes)


@fixture
def filled_chain(chain):
    chain.extend([1, 2, 3, 4])
    return chain


class TestInitialization:

    def test_empty_list_makes_no_changes(self, ol, changes):
        assert changes == []

    def test_empty_list_does_not_contain_anything(self, ol):
        for i in range(10):
            assert i not in ol


class TestObserver:

    def test_notify_observers_mock(self, ol):
        observer = MagicMock()
        change = MagicMock()
        ol.register_observer(observer)
        ol.notify_observers(change)
        observer.assert_called_with(change)

    def test_notify_observers(self, ol, changes):
        change = MagicMock()
        ol.notify_observers(change)
        assert len(changes) == 1
        assert changes[0][0] is change


# To test:
# def __init__(self, initlist=None):
# def __repr__(self):              tested in TestNoChanges
# def __lt__(self, other):         tested in TestNoChanges
# def __le__(self, other):         tested in TestNoChanges
# def __eq__(self, other):         tested in TestNoChanges
# def __gt__(self, other):         tested in TestNoChanges
# def __ge__(self, other):         tested in TestNoChanges
# def __contains__(self, item):    tested in TestNoChanges
# def __len__(self):               tested in TestNoChanges
# def __getitem__(self, i):        tested in TestItemMethods
# def __setitem__(self, i, item):  tested in TestItemMethods
# def __delitem__(self, i):        tested in TestItemMethods
# def __add__(self, other):        tested in TestNoChanges
# def __radd__(self, other):       untested, AttributeError
# def __iadd__(self, other):       tested in TestAddElements
# def __mul__(self, n):            tested in TestAddElements
# __rmul__ = __mul__               tested in TestAddElements
# def __imul__(self, n):           tested in TestAddElements
# def append(self, item):          tested in TestAddElements
# def insert(self, i, item):       tested in TestAddElements
# def pop(self, i=-1):             tested in TestRemoveElements
# def remove(self, item):          tested in TestRemoveElements
# def clear(self):                 tested in TestRemoveElements
# def copy(self):                  tested in TestNoChanges
# def count(self, item):           tested TestNoChanges
# def index(self, item, *args):    tested TestNoChanges
# def reverse(self):
# def sort(self, *args, **kwds):
# def extend(self, other):         tested in TestAddElements


class TestAddElements:

    def test_append_elements(self, chain):
        chain.append(3).assert_add(0, [3])
        chain.append(99).assert_add(1, [99])

    def test_insert_element_at_end(self, chain):
        chain.insert(0, 224).assert_add(0, [224])
        chain.insert(2, 223).assert_add(1, [223])
        chain.insert(-2, 222).assert_add(0, [222])
        chain.insert(-20, 228).assert_add(0, [228])
        chain.insert(20, 22).assert_add(4, [22])

    @pytest.mark.parametrize("method", ["extend", "__iadd__"])
    def test_extend(self, chain, method):
        chain.call(method)([9, 8, 7, 6]).assert_add(0, [9, 8, 7, 6])
        test = chain.call(method)(lambda: (str(i) for i in range(3)))
        test.assert_add(4, ["0", "1", "2"])
        chain.call(method)(()).assert_no_change()

    def test_imul(self, filled_chain):
        filled_chain.__imul__(3).assert_add(4, [1, 2, 3, 4, 1, 2, 3, 4])

    @pytest.mark.parametrize("argument", ["asd", 1])
    def test_no_change_for_some_arguments(self, filled_chain, argument):
        filled_chain.__imul__(argument).assert_no_change()

    @pytest.mark.parametrize("multiplier", [10, -10, 0, 1])
    def test_imul_is_empty(self, chain, multiplier):
        chain.__imul__(multiplier).assert_no_change()

    @pytest.mark.parametrize("multiplier", [0, -3])
    def test_imul_deletes(self, filled_chain, multiplier):
        filled_chain.__imul__(multiplier).assert_remove(0, [1, 2, 3, 4])


class TestRemoveElements:

    def test_pop(self, filled_chain):
        filled_chain.pop().assert_remove(3, [4])
        filled_chain.pop(-1).assert_remove(2, [3])

    def test_pop_with_positive_index(self, filled_chain):
        filled_chain.pop(1).assert_remove(1, [2])

    @pytest.mark.parametrize("multiplier", [-22, 22, -6, 6])
    def test_pop_invalid_argument(self, filled_chain, multiplier):
        filled_chain.pop(multiplier).assert_no_change()

    def test_pop_from_empty_list(self, chain):
        chain.pop().assert_no_change()

    def test_pop_type_error(self, filled_chain):
        filled_chain.pop("asd").assert_no_change()

    def test_remove(self, chain):
        chain.extend(range(30))
        chain.remove(10).assert_remove(10, [10])
        chain.remove(10).assert_no_change()

    @python3
    def test_clear(self, chain):
        chain.clear().assert_no_change()
        chain.extend([1, 2, 3, 4])
        chain.clear().assert_remove(0, [1, 2, 3, 4])
        chain.clear().assert_no_change()


class TestItemMethods:

    @pytest.mark.parametrize("argument", [4, -5, "123"])
    def test_delete_fails(self, filled_chain, argument):
        filled_chain.__delitem__(argument).assert_no_change()

    def test_delete_positive_index_inside(self, filled_chain):
        filled_chain.__delitem__(1).assert_remove(1, [2])

    def test_delete_negative_index_inside(self, filled_chain):
        filled_chain.__delitem__(-1).assert_remove(3, [4])

    def test_delete_slice_inside(self, filled_chain):
        filled_chain.__delitem__(slice(1, 3)).assert_remove(1, [2, 3])

    def test_delete_overlapping_slice(self, filled_chain):
        test = filled_chain.__delitem__(slice(-3, 3, 4))
        test.assert_remove(slice(1, 3, 4), [2])

    @pytest.mark.parametrize("index", [-10, -6, -5, -4, -1, 0, 1, 3, 4, 6, 10])
    def test_getitem(self, filled_chain, index):
        filled_chain.__getitem__(index).assert_no_change()

    def test_setitem(self, filled_chain):
        filled_chain.__setitem__(2, 8).assert_replace(2, [3], [8])


COMPARISONS = ["__lt__", "__le__", "__eq__", "__gt__", "__ge__"]
LISTS = [[1, 2, 3, 2], [2, 3, 4], [0, 1, 2, 3, 4]]
SEARCHES = ["count", "index", "__contains__"]
CONVERSIONS = ["__repr__", "__len__"]


class TestNoChanges:

    @python3
    def test_copy(self, chain):
        chain.copy().assert_no_change()
        chain.extend([2, 3, 657, 8])
        chain.copy().assert_no_change()

    @pytest.mark.parametrize("method", CONVERSIONS)
    def test_repr(self, chain, method):
        chain.call(method)().assert_no_change()
        chain.extend([1, 2, 3, 4])
        chain.call(method)().assert_no_change()

    @pytest.mark.parametrize("method", COMPARISONS)
    @pytest.mark.parametrize("other", LISTS)
    @pytest.mark.parametrize("elements", LISTS)
    def test_comparisons(self, chain, method, elements, other):
        chain.extend(elements)
        chain.call(method)(other).assert_no_change()

    @pytest.mark.parametrize("method", SEARCHES)
    @pytest.mark.parametrize("element", LISTS[0])
    @pytest.mark.parametrize("elements", LISTS)
    def test_element_search(self, chain, method, elements, element):
        chain.extend(elements)
        chain.call(method)(element).assert_no_change()

    @pytest.mark.parametrize("method", ["__rmul__", "__mul__", "__add__"])
    @pytest.mark.parametrize("parameter", [5, [4], "123"])
    def test_mul(self, chain, method, parameter):
        chain.extend([8, 4, 0, 1])
        chain.call(method)(parameter).assert_no_change()

ALL_METHODS = ["__init__", "__repr__", "__lt__", "__le__", "__eq__", "__gt__",
               "__ge__", "__contains__", "__len__", "__getitem__",
               "__setitem__", "__delitem__", "__add__", "__iadd__",
               "__mul__", "__rmul__", "__imul__", "append", "insert", "pop",
               "remove", "clear", "copy", "count", "index", "reverse", "sort",
               "extend"]
PYTHON3_ONLY_METHODS = ["clear", "copy"]


@pytest.mark.parametrize("method", ALL_METHODS)
def test_methods_have_the_description_and_help(method):
    if PY2 and method in PYTHON3_ONLY_METHODS:
        pytest.skip("Python 3 is required for method {0}".format(method))
    real = getattr(list, method)
    observable = getattr(ObservableList, method)
    assert real.__doc__ == observable.__doc__


def test_initialize_with_list():
    items = (1, 2, 3)
    assert ObservableList(items) == list(items)


@pytest.mark.parametrize("method", ["remove", "append"])
def test_change_to_string(filled_chain, method):
    change = filled_chain.call(method)(3).last_change
    string = str(change)
    assert string.startswith("<")
    assert string.endswith(">")
    assert "Change" in string