src/hunter/_predicates.pyx
# cython: linetrace=True, language_level=3str, c_api_binop_methods=True
from __future__ import absolute_import
from collections import deque
from inspect import isclass
from re import compile as re_compile
cimport cython
from ._event cimport Event
from ._event cimport fast_clone
from ._event cimport fast_detach
from ._tracer cimport *
from .actions import Action
from .actions import ColorStreamAction
__all__ = (
'And',
'From',
'Not',
'Or',
'Query',
'When',
)
cdef tuple ALLOWED_KEYS = (
'function', 'module', 'lineno', 'globals', 'stdlib', 'arg', 'locals', 'kind', 'filename', 'source',
'fullsource', 'threadname', 'threadid', 'instruction', 'depth', 'calls', 'builtin',
)
cdef tuple ALLOWED_OPERATORS = (
'startswith', 'endswith', 'in', 'contains', 'regex',
'sw', 'ew', 'has', 'rx',
'gt', 'gte', 'lt', 'lte',
)
ctypedef object (*Event_getter_typedef)(Event)
cdef inline Event_get_function(Event event): return event.function_getter()
cdef inline Event_get_module(Event event): return event.module_getter()
cdef inline Event_get_lineno(Event event): return event.lineno_getter()
cdef inline Event_get_globals(Event event): return event.globals_getter()
cdef inline Event_get_stdlib(Event event): return event.stdlib_getter()
cdef inline Event_get_arg(Event event): return event.arg
cdef inline Event_get_locals(Event event): return event.locals_getter()
cdef inline Event_get_kind(Event event): return event.kind
cdef inline Event_get_filename(Event event): return event.filename_getter()
cdef inline Event_get_source(Event event): return event.source_getter()
cdef inline Event_get_fullsource(Event event): return event.fullsource_getter()
cdef inline Event_get_threadname(Event event): return event.threadname_getter()
cdef inline Event_get_threadid(Event event): return event.threadid_getter()
cdef inline Event_get_instruction(Event event): return event.instruction_getter()
cdef inline Event_get_depth(Event event): return event.depth
cdef inline Event_get_calls(Event event): return event.calls
cdef inline Event_get_builtin(Event event): return event.builtin
cdef Event_getter_typedef[17] Event_getters = [
Event_get_function,
Event_get_module,
Event_get_lineno,
Event_get_globals,
Event_get_stdlib,
Event_get_arg,
Event_get_locals,
Event_get_kind,
Event_get_filename,
Event_get_source,
Event_get_fullsource,
Event_get_threadname,
Event_get_threadid,
Event_get_instruction,
Event_get_depth,
Event_get_calls,
Event_get_builtin,
]
@cython.final
cdef class QueryEntry:
cdef Event_getter_typedef getter
cdef int getter_index
cdef object value
def __init__(self, object value, str name):
self.value = value
self.getter_index = ALLOWED_KEYS.index(name)
self.getter = Event_getters[self.getter_index]
def __repr__(self):
return repr(self.value)
def __eq__(self, other):
return (
isinstance(other, QueryEntry)
and self.value == (<QueryEntry> other).value
and self.getter_index == (<QueryEntry> other).getter_index
)
@cython.final
cdef class Query:
"""
A query class.
See :class:`hunter.event.Event` for fields that can be filtered on.
"""
def __init__(self, **query):
"""
Args:
query: criteria to match on.
Accepted arguments:
``arg``,
``calls``,
``code``,
``depth``,
``filename``,
``frame``,
``fullsource``,
``function``,
``globals``,
``kind``,
``lineno``,
``locals``,
``module``,
``source``,
``stdlib``,
``threadid``,
``threadname``.
"""
query_eq = {}
query_startswith = {}
query_endswith = {}
query_in = {}
query_contains = {}
query_regex = {}
query_lt = {}
query_lte = {}
query_gt = {}
query_gte = {}
for key, value in query.items():
parts = [p for p in key.split('_') if p]
count = len(parts)
if count > 2:
raise TypeError(
f'Unexpected argument {key!r}. Must be one of {ALLOWED_KEYS} with optional operators like: {ALLOWED_OPERATORS}'
)
elif count == 2:
prefix, operator = parts
if operator in ('startswith', 'sw'):
if not isinstance(value, basestring):
if not isinstance(value, (list, set, tuple)):
raise ValueError(f'Value {value!r} for {key!r} is invalid. Must be a string, list, tuple or set.')
value = tuple(value)
mapping = query_startswith
elif operator in ('endswith', 'ew'):
if not isinstance(value, basestring):
if not isinstance(value, (list, set, tuple)):
raise ValueError(f'Value {value!r} for {key!r} is invalid. Must be a string, list, tuple or set.')
value = tuple(value)
mapping = query_endswith
elif operator == 'in':
mapping = query_in
elif operator in ('contains', 'has'):
mapping = query_contains
elif operator in ('regex', 'rx'):
value = re_compile(value)
mapping = query_regex
elif operator == 'lt':
mapping = query_lt
elif operator == 'lte':
mapping = query_lte
elif operator == 'gt':
mapping = query_gt
elif operator == 'gte':
mapping = query_gte
else:
raise TypeError(f'Unexpected operator {operator!r}. Must be one of {ALLOWED_OPERATORS}.')
else:
mapping = query_eq
prefix = key
if prefix not in ALLOWED_KEYS:
raise TypeError(f'Unexpected argument {key!r}. Must be one of {ALLOWED_KEYS}.')
mapping[prefix] = QueryEntry(value, prefix)
self.query_eq = tuple(sorted(query_eq.items()))
self.query_startswith = tuple(sorted(query_startswith.items()))
self.query_endswith = tuple(sorted(query_endswith.items()))
self.query_in = tuple(sorted(query_in.items()))
self.query_contains = tuple(sorted(query_contains.items()))
self.query_regex = tuple(sorted(query_regex.items()))
self.query_lt = tuple(sorted(query_lt.items()))
self.query_lte = tuple(sorted(query_lte.items()))
self.query_gt = tuple(sorted(query_gt.items()))
self.query_gte = tuple(sorted(query_gte.items()))
def __str__(self):
return 'Query(%s)' % (
', '.join([
', '.join(f'{key}{kind}={value!r}' for key, value in mapping)
for kind, mapping in [
('', self.query_eq),
('_in', self.query_in),
('_contains', self.query_contains),
('_startswith', self.query_startswith),
('_endswith', self.query_endswith),
('_regex', self.query_regex),
('_lt', self.query_lt),
('_lte', self.query_lte),
('_gt', self.query_gt),
('_gte', self.query_gte),
]
if mapping
])
)
def __repr__(self):
return '<hunter.predicates.Query: %s>' % ' '.join([
fmt % (mapping,)
for fmt, mapping in [
('query_eq=%r', self.query_eq),
('query_in=%r', self.query_in),
('query_contains=%r', self.query_contains),
('query_startswith=%r', self.query_startswith),
('query_endswith=%r', self.query_endswith),
('query_regex=%r', self.query_regex),
('query_lt=%r', self.query_lt),
('query_lte=%r', self.query_lte),
('query_gt=%r', self.query_gt),
('query_gte=%r', self.query_gte),
]
if mapping
])
def __eq__(self, other):
return (
isinstance(other, Query)
and self.query_eq == (<Query> other).query_eq
and self.query_startswith == (<Query> other).query_startswith
and self.query_endswith == (<Query> other).query_endswith
and self.query_in == (<Query> other).query_in
and self.query_contains == (<Query> other).query_contains
and self.query_regex == (<Query> other).query_regex
and self.query_lt == (<Query> other).query_lt
and self.query_lte == (<Query> other).query_lte
and self.query_gt == (<Query> other).query_gt
and self.query_gte == (<Query> other).query_gte
)
def __call__(self, Event event):
return fast_Query_call(self, event)
def __or__(self, other):
return Or(self, other)
def __and__(self, other):
return And(self, other)
def __invert__(self):
return Not(self)
cdef inline fast_Query_call(Query self, Event event):
for key, entry in self.query_eq:
value_from_event = (<QueryEntry> entry).getter(event)
if value_from_event != (<QueryEntry> entry).value:
return False
for key, entry in self.query_in:
value_from_event = (<QueryEntry> entry).getter(event)
if (<str?>value_from_event) not in (<QueryEntry> entry).value:
return False
for key, entry in self.query_contains:
value_from_event = (<QueryEntry> entry).getter(event)
if (<QueryEntry> entry).value not in (<str?>value_from_event):
return False
for key, entry in self.query_startswith:
value_from_event = (<QueryEntry> entry).getter(event)
if not (<str?>value_from_event).startswith((<QueryEntry> entry).value):
return False
for key, entry in self.query_endswith:
value_from_event = (<QueryEntry> entry).getter(event)
if not (<str?>value_from_event).endswith((<QueryEntry> entry).value):
return False
for key, entry in self.query_regex:
value_from_event = (<QueryEntry> entry).getter(event)
if not (<QueryEntry> entry).value.match(value_from_event):
return False
for key, entry in self.query_gt:
value_from_event = (<QueryEntry> entry).getter(event)
if not value_from_event > (<QueryEntry> entry).value:
return False
for key, entry in self.query_gte:
value_from_event = (<QueryEntry> entry).getter(event)
if not value_from_event >= (<QueryEntry> entry).value:
return False
for key, entry in self.query_lt:
value_from_event = (<QueryEntry> entry).getter(event)
if not value_from_event < (<QueryEntry> entry).value:
return False
for key, entry in self.query_lte:
value_from_event = (<QueryEntry> entry).getter(event)
if not value_from_event <= (<QueryEntry> entry).value:
return False
return True
@cython.final
cdef class When:
"""
Runs ``actions`` when ``condition(event)`` is ``True``.
Actions take a single ``event`` argument.
"""
def __init__(self, condition, *actions):
if not actions:
raise TypeError('Must give at least one action.')
self.condition = condition
self.actions = tuple(
action() if isclass(action) and issubclass(action, Action) else action
for action in actions)
def __str__(self):
return 'When(%s, %s)' % (
self.condition,
', '.join(repr(p) for p in self.actions)
)
def __repr__(self):
return '<hunter._predicates.When: condition=%r, actions=%r>' % (self.condition, self.actions)
def __eq__(self, other):
return (
isinstance(other, When)
and self.condition == (<When> other).condition
and self.actions == (<When> other).actions
)
def __call__(self, Event event):
return fast_When_call(self, event)
def __or__(self, other):
return Or(self, other)
def __and__(self, other):
return And(self, other)
def __invert__(self):
return Not(self)
cdef inline fast_When_call(When self, Event event):
cdef object result
result = fast_call(self.condition, event)
if result:
for action in self.actions:
action(event)
return result
@cython.final
cdef class From:
"""
Keep running ``predicates`` after ``condition(event)`` is ``True``.
"""
def __init__(self, condition, predicate=None, watermark=0):
self.condition = condition
self.predicate = predicate
self.watermark = watermark
self.origin_depth = -1
self.origin_calls = -1
def __str__(self):
return 'From(%s, %s, watermark=%s)' % (
self.condition, self.predicate, self.watermark
)
def __repr__(self):
return '<hunter._predicates.From: condition=%r, predicate=%r, watermark=%r>' % (
self.condition, self.predicate, self.watermark
)
def __eq__(self, other):
return (
isinstance(other, From)
and self.condition == (<From> other).condition
and self.predicate == (<From> other).predicate
)
def __call__(self, Event event):
return fast_From_call(self, event)
def __or__(self, other):
return Or(self, other)
def __and__(self, other):
return And(self, other)
def __invert__(self):
return Not(self)
cdef inline fast_From_call(From self, Event event):
cdef object result
cdef int delta_depth
cdef int delta_calls
if self.origin_depth == -1:
result = fast_call(self.condition, event)
if result:
self.origin_depth = event.depth
self.origin_calls = event.calls
delta_depth = delta_calls = 0
else:
return False
else:
delta_depth = event.depth - self.origin_depth
delta_calls = event.calls - self.origin_calls
if delta_depth < self.watermark:
self.origin_depth = -1
return False
if self.predicate is None:
return True
else:
relative_event = fast_clone(event)
relative_event.depth = delta_depth
relative_event.calls = delta_calls
return fast_call(self.predicate, relative_event)
@cython.final
cdef class And:
"""
`And` predicate. Exits at the first sub-predicate that returns ``False``.
"""
def __init__(self, *predicates):
self.predicates = predicates
def __str__(self):
return 'And(%s)' % ', '.join(str(p) for p in self.predicates)
def __repr__(self):
return '<hunter._predicates.And: predicates=%r>' % (self.predicates,)
def __eq__(self, other):
return (
isinstance(other, And)
and self.predicates == (<And> other).predicates
)
def __call__(self, Event event):
return fast_And_call(self, event)
def __or__(self, other):
return Or(self, other)
def __and__(self, other):
cdef list predicates
if type(self) is And:
predicates = list((<And> self).predicates)
else:
predicates = [self]
if isinstance(other, And):
predicates.extend((<And> other).predicates)
else:
predicates.append(other)
return And(*predicates)
def __invert__(self):
return Not(self)
cdef inline fast_And_call(And self, Event event):
for predicate in self.predicates:
if not fast_call(predicate, event):
return False
else:
return True
@cython.final
cdef class Or:
"""
`Or` predicate. Exits at first sub-predicate that returns ``True``.
"""
def __init__(self, *predicates):
self.predicates = predicates
def __str__(self):
return 'Or(%s)' % ', '.join(str(p) for p in self.predicates)
def __repr__(self):
return '<hunter._predicates.Or: predicates=%r>' % (self.predicates,)
def __eq__(self, other):
return (
isinstance(other, Or)
and self.predicates == (<Or> other).predicates
)
def __call__(self, Event event):
return fast_Or_call(self, event)
def __or__(self, other):
cdef list predicates
if type(self) is Or:
predicates = list((<Or> self).predicates)
else:
predicates = [self]
if type(other) is Or:
predicates.extend((<Or> other).predicates)
else:
predicates.append(other)
return Or(*predicates)
def __and__(self, other):
return And(self, other)
def __invert__(self):
return Not(self)
cdef inline fast_Or_call(Or self, Event event):
for predicate in self.predicates:
if fast_call(predicate, event):
return True
else:
return False
cdef class Not:
"""
`Not` predicate.
"""
def __init__(self, predicate):
self.predicate = predicate
def __str__(self):
return 'Not(%s)' % self.predicate
def __repr__(self):
return '<hunter._predicates.Not: predicate=%r>' % self.predicate
def __eq__(self, other):
return (
isinstance(other, Not)
and self.predicate == (<Not> other).predicate
)
def __call__(self, Event event):
return fast_Not_call(self, event)
def __or__(self, other):
if type(self) is Not and type(other) is Not:
return Not(And((<Not> self).predicate, (<Not> other).predicate))
else:
return Or(self, other)
def __and__(self, other):
if type(self) is Not and type(other) is Not:
return Not(Or((<Not> self).predicate, (<Not> other).predicate))
else:
return And(self, other)
def __invert__(self):
return self.predicate
cdef inline fast_Not_call(Not self, Event event):
return not fast_call(self.predicate, event)
cdef inline fast_call(callable, Event event):
if type(callable) is Query:
return fast_Query_call(<Query> callable, event)
elif type(callable) is Or:
return fast_Or_call(<Or> callable, event)
elif type(callable) is And:
return fast_And_call(<And> callable, event)
elif type(callable) is Not:
return fast_Not_call(<Not> callable, event)
elif type(callable) is When:
return fast_When_call(<When> callable, event)
elif type(callable) is From:
return fast_From_call(<From> callable, event)
elif type(callable) is Backlog:
return fast_Backlog_call(<Backlog> callable, event)
else:
return callable(event)
@cython.final
cdef class Backlog:
def __init__(self, condition, size=100, stack=10, vars=False, strip=True, action=None, filter=None):
self.action = action() if isclass(action) and issubclass(action, Action) else action
if not isinstance(self.action, ColorStreamAction):
raise TypeError("Action %r must be a ColorStreamAction." % self.action)
self.condition = condition
self.queue = deque(maxlen=size)
self.size = size
self.stack = stack
self.strip = strip
self.vars = vars
self._try_repr = self.action.try_repr if self.vars else None
self._filter = filter
def __call__(self, event):
return fast_Backlog_call(self, event)
def __str__(self):
return 'Backlog(%s, size=%s, stack=%s, vars=%s, action=%s, filter=%s)' % (
self.condition, self.size, self.stack, self.vars, self.action, self._filter
)
def __repr__(self):
return '<hunter.predicates.Backlog: condition=%r, size=%r, stack=%r, vars=%r, action=%r, filter=%r>' % (
self.condition, self.size, self.stack, self.vars, self.action, self._filter
)
def __eq__(self, other):
return (
isinstance(other, Backlog) and
self.condition == (<Backlog> other).condition and
self.size == (<Backlog> other).size and
self.stack == (<Backlog> other).stack and
self.vars == (<Backlog> other).vars and
self.action == (<Backlog> other).action
)
def __or__(self, other):
return Or(self, other)
def __and__(self, other):
return And(self, other)
def __invert__(self):
return Backlog(Not(self.condition), size=self.size, stack=self.stack, vars=self.vars, action=self.action, filter=self._filter)
def filter(self, *predicates, **kwargs):
from hunter import _merge
if self._filter is not None:
predicates = (self._filter, *predicates)
return Backlog(
self.condition,
size=self.size, stack=self.stack, vars=self.vars, action=self.action,
filter=_merge(*predicates, **kwargs)
)
cdef inline fast_Backlog_call(Backlog self, Event event):
cdef bint first_is_call
cdef Event detached_event
cdef Event first_event
cdef Event stack_event
cdef FrameType first_frame
cdef FrameType frame
cdef int backlog_call_depth
cdef int depth_delta
cdef int first_depth
cdef int missing_depth
cdef object result
cdef object stack_events
result = fast_call(self.condition, event)
if result:
if self.queue:
self.action.cleanup()
first_event = <Event> self.queue[0]
first_depth = first_event.depth
backlog_call_depth = event.depth - first_depth
first_is_call = first_event.kind == 'call' # note that True is 1, thus the following math is valid
missing_depth = min(first_depth, max(0, self.stack - backlog_call_depth + first_is_call))
if missing_depth:
if first_is_call and first_event.frame is not None:
first_frame = first_event.frame.f_back
else:
first_frame = first_event.frame
if first_frame is not None:
stack_events = deque() # a new deque because self.queue is limited, we can't add while it's full
frame = first_frame
depth_delta = 0
while frame and depth_delta < missing_depth:
stack_event = Event(
frame=frame, kind=0, arg=None,
threading_support=event.threading_support,
depth=first_depth - depth_delta - 1, calls=-1
)
if not self.vars:
# noinspection PyPropertyAccess
stack_event._locals = {}
stack_event._globals = {}
stack_event.detached = True
stack_events.appendleft(stack_event)
frame = frame.f_back
depth_delta += 1
for stack_event in stack_events:
if self._filter is None or self._filter(stack_event):
self.action(stack_event)
for backlog_event in self.queue:
if self._filter is None:
self.action(backlog_event)
elif fast_call(self._filter, <Event> backlog_event):
self.action(backlog_event)
self.queue.clear()
else:
if self.strip and event.depth < 1:
# Looks like we're back to depth 0 for some reason.
# Delete everything because we don't want to see what is likely just a long stream of useless returns.
self.queue.clear()
if self._filter is None or self._filter(event):
detached_event = fast_detach(event, self._try_repr)
detached_event.frame = event.frame
self.queue.append(detached_event)
return result