src/hunter/_event.pyx
# cython: linetrace=True, language_level=3str, c_string_encoding=ascii
from functools import partial
from linecache import getline
from linecache import getlines
from os.path import basename
from os.path import exists
from os.path import splitext
from threading import current_thread
from tokenize import TokenError
from tokenize import generate_tokens
from cpython.pythread cimport PyThread_get_thread_ident
from cpython.ref cimport Py_XINCREF
from cpython.ref cimport PyObject
from cython cimport auto_pickle
from ._tracer cimport Tracer
from .vendor._cymem.cymem cimport Pool
from .const import SITE_PACKAGES_PATHS
from .const import SYS_PREFIX_PATHS
from .util import CYTHON_SUFFIX_RE
from .util import LEADING_WHITESPACE_RE
from .util import MISSING
from .util import get_func_in_mro
from .util import get_main_thread
from .util import if_same_code
__all__ = 'Event',
cdef object UNSET = object()
cdef Pool mem = Pool()
cdef PyObject** KIND_NAMES = make_kind_names(['call', 'exception', 'line', 'return', 'call', 'exception', 'return'])
cdef inline PyObject** make_kind_names(list strings):
cdef PyObject** array = <PyObject**>mem.alloc(len(strings), sizeof(PyObject*))
cdef object name
for i, string in enumerate(strings):
name = intern(string)
Py_XINCREF(<PyObject*> name)
array[i] = <PyObject*> name
return <PyObject**>array
@auto_pickle(False)
cdef class Event:
"""
A wrapper object for Frame objects. Instances of this are passed to your custom functions or predicates.
Provides few convenience properties.
Args:
frame (Frame): A python `Frame <https://docs.python.org/3/reference/datamodel.html#frame-objects>`_ object.
kind (str): A string like ``'call'``, ``'line'``, ``'return'`` or ``'exception'``.
arg: A value that depends on ``kind``. Usually is ``None`` but for ``'return'`` or ``'exception'`` other values
may be expected.
tracer (:class:`hunter.tracer.Tracer`): The :class:`~hunter.tracer.Tracer` instance that created the event.
Needed for the ``calls`` and ``depth`` fields.
"""
def __init__(self, FrameType frame, int kind, object arg, Tracer tracer=None, object depth=None, object calls=None,
object threading_support=MISSING):
if tracer is None:
if depth is None:
raise TypeError('Missing argument: depth (required because tracer was not given).')
if calls is None:
raise TypeError('Missing argument: calls (required because tracer was not given).')
if threading_support is MISSING:
raise TypeError('Missing argument: threading_support (required because tracer was not given).')
else:
depth = tracer.depth
calls = tracer.calls
threading_support = tracer.threading_support
self.arg = arg
self.frame = frame
self.kind = <str> KIND_NAMES[kind]
self.depth = depth
self.calls = calls
self.threading_support = threading_support
self.detached = False
self.builtin = kind > 3
self._code = UNSET
self._filename = UNSET
self._fullsource = UNSET
self._function_object = UNSET
self._function = UNSET
self._globals = UNSET
self._lineno = UNSET
self._locals = UNSET
self._module = UNSET
self._source = UNSET
self._stdlib = UNSET
self._threadidn = UNSET
self._threadname = UNSET
self._thread = UNSET
self._instruction = UNSET
def __repr__(self):
return '<Event kind=%r function=%r module=%r filename=%r lineno=%s>' % (
self.kind, self.function, self.module, self.filename, self.lineno
)
def __eq__(self, other):
return self is other
def detach(self, value_filter=None):
return fast_detach(self, value_filter)
def clone(self):
return fast_clone(self)
cdef inline instruction_getter(self):
cdef int position
if self._instruction is UNSET:
position = PyFrame_GetLasti(self.frame)
co_code = PyCode_GetCode(self.code_getter())
if co_code and position >= 0:
self._instruction = co_code[position]
else:
self._instruction = None
return self._instruction
@property
def instruction(self):
return self.instruction_getter()
cdef inline threadid_getter(self):
cdef long current
if self._threadidn is UNSET:
current = PyThread_get_thread_ident()
main = get_main_thread()
if main is not None and current == main.ident:
self._threadidn = None
else:
self._threadidn = current
return self._threadidn
@property
def threadid(self):
return self.threadid_getter()
cdef inline threadname_getter(self):
if self._threadname is UNSET:
if self._thread is UNSET:
self._thread = current_thread()
self._threadname = self._thread.name
return self._threadname
@property
def threadname(self):
return self.threadname_getter()
cdef inline locals_getter(self):
if self._locals is UNSET:
if self.builtin:
self._locals = {}
else:
PyFrame_FastToLocals(self.frame)
self._locals = PyFrame_GetLocals(self.frame)
return self._locals
@property
def locals(self):
return self.locals_getter()
cdef inline globals_getter(self):
if self._globals is UNSET:
if self.builtin:
self._locals = {}
else:
self._globals = PyFrame_GetGlobals(self.frame)
return self._globals
@property
def globals(self):
return self.globals_getter()
cdef inline function_getter(self):
if self._function is UNSET:
if self.builtin:
self._function = self.arg.__name__
else:
self._function = self.code_getter().co_name
return self._function
@property
def function(self):
return self.function_getter()
@property
def function_object(self):
cdef CodeType code
if self.builtin:
return self.builtin
elif self._function_object is UNSET:
code = self.code_getter()
if code.co_name is None:
return None
# First, try to find the function in globals
candidate = self.globals.get(code.co_name, None)
func = if_same_code(candidate, code)
# If that failed, as will be the case with class and instance methods, try
# to look up the function from the first argument. In the case of class/instance
# methods, this should be the class (or an instance of the class) on which our
# method is defined.
if func is None and code.co_argcount >= 1:
first_arg = self.locals.get(PyCode_GetVarnames(code)[0])
func = get_func_in_mro(first_arg, code)
# If we still can't find the function, as will be the case with static methods,
# try looking at classes in global scope.
if func is None:
for v in self.globals.values():
if not isinstance(v, type):
continue
func = get_func_in_mro(v, code)
if func is not None:
break
self._function_object = func
return self._function_object
cdef inline module_getter(self):
if self._module is UNSET:
if self.builtin:
module = self.arg.__module__
else:
module = self.globals.get('__name__', '')
if module is None:
module = '?'
self._module = module
return self._module
@property
def module(self):
return self.module_getter()
cdef inline filename_getter(self):
cdef CodeType code
if self._filename is UNSET:
code = self.code_getter()
filename = code.co_filename
if not filename:
filename = self.globals.get('__file__')
if not filename:
filename = '?'
elif filename.endswith(('.pyc', '.pyo')):
filename = filename[:-1]
elif filename.endswith(('.so', '.pyd')):
cybasename = CYTHON_SUFFIX_RE.sub('', filename)
for ext in ('.pyx', '.py'):
cyfilename = cybasename + ext
if exists(cyfilename):
filename = cyfilename
break
self._filename = filename
return self._filename
@property
def filename(self):
return self.filename_getter()
cdef inline lineno_getter(self):
if self._lineno is UNSET:
self._lineno = PyFrame_GetLineNumber(self.frame)
return self._lineno
@property
def lineno(self):
return self.lineno_getter()
cdef inline CodeType code_getter(self):
if self._code is UNSET:
return PyFrame_GetCode(self.frame)
else:
return self._code
@property
def code(self):
return self.code_getter()
cdef inline stdlib_getter(self):
if self._stdlib is UNSET:
module_parts = self.module.split('.')
if 'pkg_resources' in module_parts:
# skip this over-vendored module
self._stdlib = True
elif self.filename == '<string>' and (self.module.startswith('namedtuple_') or self.module == 'site'):
# skip namedtuple exec garbage
self._stdlib = True
elif self.filename.startswith(SITE_PACKAGES_PATHS):
# if in site-packages then definitely not stdlib
self._stdlib = False
elif self.filename.startswith(SYS_PREFIX_PATHS):
self._stdlib = True
else:
self._stdlib = False
return self._stdlib
@property
def stdlib(self):
return self.stdlib_getter()
cdef inline fullsource_getter(self):
cdef list lines
cdef CodeType code
if self._fullsource is UNSET:
try:
self._fullsource = None
code = self.code_getter()
if self.kind == 'call' and code.co_name != '<module>':
lines = []
try:
for _, token, _, _, _ in generate_tokens(
partial(
next,
yield_lines(
self.filename,
self.frame.f_globals,
self.lineno - 1,
lines,
),
)
):
if token in ('def', 'class', 'lambda'):
self._fullsource = ''.join(lines)
break
except TokenError:
pass
if self._fullsource is None:
self._fullsource = getline(self.filename, self.lineno, self.globals)
except Exception as exc:
self._fullsource = f'??? NO SOURCE: {exc!r}'
return self._fullsource
@property
def fullsource(self):
return self.fullsource_getter()
cdef inline source_getter(self):
if self._source is UNSET:
if self.filename.endswith(('.so', '.pyd')):
self._source = f'??? NO SOURCE: not reading binary {splitext(basename(self.filename))[1]} file'
try:
self._source = getline(self.filename, self.lineno, self.globals)
except Exception as exc:
self._source = f'??? NO SOURCE: {exc!r}'
return self._source
@property
def source(self):
return self.source_getter()
def __getitem__(self, item):
return getattr(self, item)
def yield_lines(filename, module_globals, start, list collector,
limit=10):
dedent = None
amount = 0
for line in getlines(filename, module_globals)[start:start + limit]:
if dedent is None:
dedent = LEADING_WHITESPACE_RE.findall(line)
dedent = dedent[0] if dedent else ''
amount = len(dedent)
elif not line.startswith(dedent):
break
collector.append(line)
yield line[amount:]
cdef inline Event fast_detach(Event self, object value_filter):
event = <Event> Event.__new__(Event)
event._code = self.code_getter()
event._filename = self.filename_getter()
event._fullsource = self.fullsource_getter()
event._function_object = self._function_object
event._function = self.function_getter()
event._lineno = self.lineno_getter()
event._module = self.module_getter()
event._source = self.source_getter()
event._stdlib = self.stdlib_getter()
event._threadidn = self.threadid_getter()
event._threadname = self.threadname_getter()
event._instruction = self.instruction_getter()
if value_filter:
event._globals = {key: value_filter(value) for key, value in self.globals.items()}
event._locals = {key: value_filter(value) for key, value in self.locals.items()}
event.arg = value_filter(self.arg)
else:
event._globals = {}
event._locals = {}
event.arg = None
event.builtin = self.builtin
event.calls = self.calls
event.depth = self.depth
event.detached = True
event.kind = self.kind
event.threading_support = self.threading_support
return event
cdef inline Event fast_clone(Event self):
event = <Event> Event.__new__(Event)
event.arg = self.arg
event.builtin = self.builtin
event.calls = self.calls
event.depth = self.depth
event.detached = False
event.frame = self.frame
event.kind = self.kind
event.threading_support = self.threading_support
event._code = self._code
event._filename = self._filename
event._fullsource = self._fullsource
event._function_object = self._function_object
event._function = self._function
event._globals = self._globals
event._lineno = self._lineno
event._locals = self._locals
event._module = self._module
event._source = self._source
event._stdlib = self._stdlib
event._threadidn = self._threadidn
event._threadname = self._threadname
event._thread = self._thread
event._instruction = self._instruction
return event