fuzeman/trakt.py

View on GitHub
trakt/core/context_collection.py

Summary

Maintainability
A
25 mins
Test Coverage


from trakt.core.helpers import synchronized

from threading import RLock
import _thread
import logging

log = logging.getLogger(__name__)


class ListCollection(object):
    def __init__(self, *lists):
        self._lists = lists or []
        self._lock = RLock()

    @synchronized(lambda self: self._lock)
    def append(self, value):
        collection = self._lists[-1]

        if type(collection) is not list:
            raise ValueError()

        collection.append(value)

    @synchronized(lambda self: self._lock)
    def find_list(self, index):
        count = len(self)

        if index >= count:
            raise IndexError()

        if index < 0:
            index += count

        pos = 0

        for lst in self.lists():
            l_len = len(lst)

            if pos <= index < pos + l_len:
                return lst, index - pos
            else:
                pos += l_len

        return None, None

    @synchronized(lambda self: self._lock)
    def lists(self, resolve=True):
        for collection in self._lists:
            if resolve and callable(collection):
                collection = collection()

            yield collection

    @synchronized(lambda self: self._lock)
    def pop(self, index=None):
        if index is None:
            index = len(self) - 1

        list, index = self.find_list(index)

        if list is None:
            raise IndexError()

        return list.pop(index)

    @synchronized(lambda self: self._lock)
    def __eq__(self, other):
        if len(self) != len(other):
            return False

        for x in range(len(self)):
            if self[x] != other[x]:
                return False

        return True

    @synchronized(lambda self: self._lock)
    def __contains__(self, value):
        for x in self:
            if x == value:
                return True

        return False

    def __getitem__(self, index):
        list, index = self.find_list(index)

        if list is None:
            raise IndexError()

        return list[index]

    @synchronized(lambda self: self._lock)
    def __iter__(self):
        for lst in self.lists():
            # Yield items from each list
            for x in lst:
                yield x

    @synchronized(lambda self: self._lock)
    def __len__(self):
        return sum([len(lst) for lst in self.lists()])

    def __setitem__(self, index, value):
        list, index = self.find_list(index)

        if list is None:
            raise IndexError()

        list[index] = value

    def __repr__(self):
        return '[%s]' % ', '.join(repr(x) for x in self)

    __hash__ = None


class ContextCollection(object):
    def __init__(self, base=None):
        self.base = base or []

        self._lock = RLock()
        self._threads = {}

    @synchronized(lambda self: self._lock)
    def build(self, ident):
        if ident not in self._threads:
            self._threads[ident] = ListCollection(lambda: self.base, [])

        return self._threads[ident]

    @property
    def current(self):
        ident = _thread.get_ident()

        try:
            return self._threads[ident]
        except KeyError:
            return self.build(ident)

    def append(self, value):
        self.current.append(value)

    @synchronized(lambda self: self._lock)
    def clear(self):
        ident = _thread.get_ident()

        if ident not in self._threads:
            return

        del self._threads[ident]

    def pop(self, index=None):
        return self.current.pop(index)

    def __getitem__(self, index):
        return self.current[index]

    def __len__(self):
        return len(self.current)