sidekick-functions/sidekick/functions/lib_runtime.py
import time
from functools import wraps
from .core_functions import quick_fn
from .fn import fn, to_callable
from .lib_combinators import always
from .._utils import to_raisable, is_raisable
from ..typing import (
NOT_GIVEN,
TYPE_CHECKING,
Func,
Catchable,
Literal,
Callable,
overload,
Any,
Union,
Dict,
T,
)
Err = Ok = Result = None
if TYPE_CHECKING:
from .. import api as sk # noqa: F401
from ..api import X, Y # noqa: F401
from ..types.maybe import Maybe # noqa: F401
from ..types.result import Result # noqa: F401
__doctest_skip__ = ["result"]
@fn
def once(func: Func) -> fn:
"""
Limit function to a single invocation.
Repeated calls to the function return the value of the first invocation.
Examples:
This is useful to wrap initialization routines or singleton factories.
>>> @sk.once
... def configure():
... print('setting up...')
... return {'status': 'ok'}
>>> configure()
setting up...
{'status': 'ok'}
See Also:
:func:`thunk`
:func:`call_after`
:func:`call_at_most`
"""
func = to_callable(func)
# We create the local binding without initializing the variable. We chose
# this approach instead of initializing with a "not_given" value, since the
# common path of returning the pre-computed result of func() can be
# executed faster inside a try/except block
if False:
value = None # noqa
@wraps(func)
@quick_fn
def once_fn(*args, **kwargs):
nonlocal value
try:
return value
except NameError:
value = func(*args, **kwargs)
return value
return once_fn
@overload
def thunk(
func: type(Ellipsis), /, *args, **kwargs
) -> Callable[[Callable], Callable[[], Any]]:
...
@overload
def thunk(func: Callable, /, *args, **kwargs) -> Callable[[], Any]:
...
@fn
def thunk(func, /, *args, **kwargs):
"""
A thunk that represents a lazy computation.
Python thunks are represented by zero-argument functions that compute the
value of computation on demand and store it for subsequent invocations.
This function receives a function as the first argument, but behaves as a
decorator if the caller passes an ellipsis.
Example:
>>> conf = sk.thunk(dict)
>>> conf() is conf() == {}
True
>>> @sk.thunk(..., host='localhost', port=5432)
... def db(host, port): # noqa
... print(f'connecting to SQL server at {host}:{port}...')
... return {'host': host, 'port': port}
>>> db()
connecting to SQL server at localhost:5432...
{'host': 'localhost', 'port': 5432}
>>> db()
{'host': 'localhost', 'port': 5432}
See Also:
:func:`once`
"""
# We create the local binding without initializing the variable. We chose
# this approach instead of initializing with a "not_given" value, since the
# common path of returning the pre-computed result of func() can be
# executed faster inside a try/except block
if func is ...:
result = None # noqa
return lambda f: thunk(f, *args, **kwargs)
@wraps(func)
def get_value() -> Any:
nonlocal result
try:
return result
except NameError:
result = func(*args, **kwargs)
return result
# Lambda golf:
# thunk = lambda f, *args: (lambda v=f(*args): lambda: v)()
return get_value
@fn.curry(2)
def call_after(n: int, func: Func, *, default=None) -> fn:
"""
Creates a function that invokes func once it's called more than n times.
Args:
n:
Number of times before starting invoking n.
func:
Function to be invoked.
default:
Value returned before func() starts being called.
Example:
>>> f = sk.call_after(2, (X * 2), default=0)
>>> [f(1), f(2), f(3), f(4), ...]
[0, 0, 6, 8, ...]
See Also:
:func:`once`
"""
if n <= 0: # nocover
raise ValueError("n must be positive")
@fn.wraps(func)
def after(*args, **kwargs):
nonlocal n
if n == 0:
return func(*args, **kwargs)
else:
n -= 1
return default
func = to_callable(func)
return after
@fn.curry(2)
def call_at_most(n: int, func: Func) -> fn:
"""
Creates a function that invokes func while it's called less than n times.
Subsequent calls to the created function return the result of the last
func invocation.
Args:
n:
The number of calls at which func is no longer invoked.
func:
Function to restrict.
Examples:
>>> log = sk.call_at_most(2, print)
>>> log("error1"); log("error2"); log("error3"); log("error4")
error1
error2
See Also:
:func:`once`
:func:`call_after`
"""
if n <= 0: # nocover
raise ValueError("n must be positive")
result = None # noqa
@fn.wraps(func)
def at_most(*args, **kwargs):
nonlocal n, result
if n == 0:
return result
else:
n -= 1
result = func(*args, **kwargs)
return result
func = to_callable(func)
return at_most
@fn.curry(2)
def throttle(
dt: float,
func: Func,
policy: Literal["last", "block"] = "last",
clock: Callable[[], T] = time.monotonic,
sleep: Callable[[T], None] = time.sleep,
) -> fn:
"""
Limit the rate of execution of func to once at each ``dt`` seconds.
When rate-limited, returns the last result returned by func.
Args:
dt:
Interval between actual function calls.
func:
Target function.
policy:
One of 'last' (default) or 'block'. Control how function behaves
when called between two successive intervals.
* 'last': return the last computed value.
* 'block': block execution until deadline is reached.
clock:
The timing function used to compute deadlines. Defaults to ``time.monotonic``
sleep:
Sleep function used in conjunction with clock. Both functions must use
the same time units.
Example:
>>> f = sk.throttle(1, (X * 2))
>>> [f(21), f(14), f(7), f(0)]
[42, 42, 42, 42]
"""
deadline = -float("inf")
last_result = None
if policy == "last":
def limited(*args, **kwargs):
nonlocal deadline, last_result
now = clock()
if now >= deadline:
deadline = now + dt
last_result = func(*args, **kwargs)
return last_result
elif policy == "block":
def limited(*args, **kwargs):
nonlocal deadline
now = clock()
if now < deadline:
sleep(deadline - now)
deadline = now + dt
return func(*args, **kwargs)
else: # nocover
raise TypeError(f"invalid policy: {policy!r}")
func = to_callable(func)
return fn.wraps(func)(limited)
class Background:
"""
Wraps a background computation.
"""
__slots__ = "_output", "_error", "thread"
_output: Any
def __init__(self, target):
from threading import Thread
self._error = None
def _real_target():
try:
self._output = target()
except Exception as e:
self._error = e
self.thread = Thread(target=_real_target)
self.thread.start()
def __repr__(self):
if self.thread.is_alive():
return "Background(...)"
elif self._error is not None:
return f"Background({self._error!r})"
else:
return f"Background({self._output!r})"
def __call__(self, **kwargs):
return self.get(**kwargs)
def get(self, timeout=None, *, default=NOT_GIVEN):
"""
Return result of computation.
Can set optional timeout and default arguments.
"""
try:
return self._output
except AttributeError:
pass
if self._error is not None:
raise self._error
self.thread.join(timeout)
if self.thread.is_alive():
if default is NOT_GIVEN:
raise TimeoutError
return default
if self._error is not None:
raise self._error
return self._output
def maybe(self) -> "Maybe": # noqa
"""
Return Just(result), if available or Nothing.
"""
from ..types.maybe import Just, Nothing
sentinel = object()
res = self.get(0, default=sentinel)
return Nothing if res is sentinel else Just(res)
def result(self) -> "Result": # noqa
"""
Wrap result in an Result value.
Return Err(TimeoutError) if the function has not terminated yet.
"""
from ..types.maybe import Err
try:
return self.maybe().to_result(TimeoutError)
except Exception as e:
return Err(e)
def background(func: Func, /, *args, **kwargs) -> Background:
"""
Run function in the background with the supplied arguments.
This function returns a Background value responsible for fetching the
result of computation. If called with no parameters, it blocks until the
function ends computation and return the result.
The function also accepts the timeout and default parameters.
Args:
func:
Function or callable wrapped to support being called in the
background.
Examples:
>>> fib = lambda n: 1 if n <= 2 else fib(n - 1) + fib(n - 2)
>>> res = sk.background(fib, 30) # Do not block execution, return a thunk
We can inspect partial results. ``res.maybe`` will return Just(value)
if computation is completed and ``Nothing`` otherwise.
>>> res = sk.background(fib, 30)
>>> res.maybe()
Nothing
In order to inspect errors or the current state of excution, use
the result method.
>>> res = sk.background(fib, 30)
>>> res.result()
Err(TimeoutError)
We can finally force completion using the blocking operation:
>>> res = sk.background(fib, 30)
>>> res.get()
832040
"""
func = to_callable(func)
return Background(lambda: func(*args, **kwargs))
@fn
def error(exc):
"""
Raises the given exception.
If argument is not an exception, raises ValueError(exc).
Examples:
>>> sk.error('some error')
Traceback (most recent call last):
...
ValueError: some error
See Also:
* :func:`raising`: create a function that raises an error instead of
raising it immediately
"""
raise to_raisable(exc)
@fn.curry(2)
def catch(exc: Union[Catchable, Dict[Catchable, Any]], func: Func, /, *args, **kwargs):
"""
Handle exception in function. If the exception occurs, it executes the given
handler.
Examples:
>>> div = (X / Y)
>>> print(sk.catch(ZeroDivisionError, div, 1, 0))
None
It is possible to map the return value in case of errors to other errors
or to other values.
>>> sk.catch({ZeroDivisionError: float('nan')}, div, 1, 0)
nan
"""
try:
return func(*args, **kwargs)
except Exception as e:
is_exc = isinstance(exc, type)
if is_exc and isinstance(e, exc):
return None
elif not is_exc:
res = exc[type(e)]
if is_raisable(res):
raise res
return res
raise
@fn.curry(2)
def catching(errors: Catchable, func: Func):
"""
Similar to catch, but decorates a function rewriting its error handling
policy.
Examples:
>>> @sk.catching({KeyError: ValueError})
... def get_value(name):
... return db[name] # noqa
"""
if isinstance(errors, type):
errors = {errors: None}
exceptions = tuple(errors)
handlers = {}
for err, handler in errors.items():
if is_raisable(handler):
handlers[err] = error.partial(handler)
else:
handlers[err] = always(handler)
def runner(*args, **kwargs):
try:
return func(*args, **kwargs)
except exceptions as e:
try:
handler_fn = handlers[type(e)]
except KeyError:
raise
return handler_fn(e)
return quick_fn(runner)
@fn
def result(func, /, *args, **kwargs) -> "Result":
"""
Execute function and wrap result in a Result type.
If execution is successful, return Ok(result), if it raises an exception,
return Err(exception).
>>> res = result(very_complex_computation, 42) # noqa
>>> res.is_ok # computation was not successful!
False
"""
func = to_callable(func)
try:
res = func(*args, **kwargs)
except Exception as ex:
return Err(ex)
else:
if isinstance(res, Result):
return res
return Ok(res)
# noinspection PyShadowingNames
@fn.curry(2)
def retry(n: int, func: Func, *, error: Catchable = Exception, sleep=None) -> fn:
"""
Retry to execute function at least n times before raising an error.
This is useful for functions that may fail due to interaction with external
resources (e.g., fetch data from the network).
Args:
n:
Maximum number of times to execute function
func:
Function that may raise errors.
error:
Exception or tuple with suppressed exceptions.
sleep:
Interval in which it sleeps between attempts.
Example:
>>> queue = [111, 7, None, None]
>>> process = sk.retry(5, lambda n: queue.pop() * n)
>>> process(6)
42
"""
@fn.wraps(func)
def safe_func(*args, **kwargs):
for _ in range(n - 1):
try:
return func(*args, **kwargs)
except error:
if sleep:
time.sleep(sleep)
return func(*args, **kwargs)
func = to_callable(func)
return safe_func