trakt/core/context_collection.py
from trakt.core.helpers import synchronized
from threading import RLock
import _thread
import logging
log = logging.getLogger(__name__)
class ListCollection(object):
def __init__(self, *lists):
self._lists = lists or []
self._lock = RLock()
@synchronized(lambda self: self._lock)
def append(self, value):
collection = self._lists[-1]
if type(collection) is not list:
raise ValueError()
collection.append(value)
@synchronized(lambda self: self._lock)
def find_list(self, index):
count = len(self)
if index >= count:
raise IndexError()
if index < 0:
index += count
pos = 0
for lst in self.lists():
l_len = len(lst)
if pos <= index < pos + l_len:
return lst, index - pos
else:
pos += l_len
return None, None
@synchronized(lambda self: self._lock)
def lists(self, resolve=True):
for collection in self._lists:
if resolve and callable(collection):
collection = collection()
yield collection
@synchronized(lambda self: self._lock)
def pop(self, index=None):
if index is None:
index = len(self) - 1
list, index = self.find_list(index)
if list is None:
raise IndexError()
return list.pop(index)
@synchronized(lambda self: self._lock)
def __eq__(self, other):
if len(self) != len(other):
return False
for x in range(len(self)):
if self[x] != other[x]:
return False
return True
@synchronized(lambda self: self._lock)
def __contains__(self, value):
for x in self:
if x == value:
return True
return False
def __getitem__(self, index):
list, index = self.find_list(index)
if list is None:
raise IndexError()
return list[index]
@synchronized(lambda self: self._lock)
def __iter__(self):
for lst in self.lists():
# Yield items from each list
for x in lst:
yield x
@synchronized(lambda self: self._lock)
def __len__(self):
return sum([len(lst) for lst in self.lists()])
def __setitem__(self, index, value):
list, index = self.find_list(index)
if list is None:
raise IndexError()
list[index] = value
def __repr__(self):
return '[%s]' % ', '.join(repr(x) for x in self)
__hash__ = None
class ContextCollection(object):
def __init__(self, base=None):
self.base = base or []
self._lock = RLock()
self._threads = {}
@synchronized(lambda self: self._lock)
def build(self, ident):
if ident not in self._threads:
self._threads[ident] = ListCollection(lambda: self.base, [])
return self._threads[ident]
@property
def current(self):
ident = _thread.get_ident()
try:
return self._threads[ident]
except KeyError:
return self.build(ident)
def append(self, value):
self.current.append(value)
@synchronized(lambda self: self._lock)
def clear(self):
ident = _thread.get_ident()
if ident not in self._threads:
return
del self._threads[ident]
def pop(self, index=None):
return self.current.pop(index)
def __getitem__(self, index):
return self.current[index]
def __len__(self):
return len(self.current)