smok-serwis/smok-client

View on GitHub
smok/pathpoint/orders.py

Summary

Maintainability
A
0 mins
Test Coverage
F
50%
import enum
import logging
import time
import typing as tp
from concurrent.futures import Future

from satella.coding.structures import ReprableMixin
from satella.coding.concurrent import FutureCollection

from .typing import PathpointValueType

__all__ = ['AdviseLevel', 'Disposition', 'Order', 'ReadOrder', 'WriteOrder',
           'WaitOrder', 'MessageOrder', 'Section', 'sections_from_list',
           'SysctlOrder']

logger = logging.getLogger(__name__)


class AdviseLevel(enum.IntEnum):
    """
    Advise level specifies how hard should the device try to execute this command.
    """
    ADVISE = 0  #: Best-effort
    FORCE = 1  #: Nearly guarantees correct delivery, up to blocking the pipeline if need be


class Order:
    """Base class for all orders"""
    __slots__ = ()


class SysctlOrder(Order, ReprableMixin):
    """
    A sysctl order. These are completely user-defined.
    """
    _REPR_FIELDS = 'op_type', 'op_args'
    __slots__ = 'op_type', 'op_args'

    def __init__(self, op_type: str, op_args: str):
        self.op_type = op_type
        self.op_args = op_args

    def __str__(self) -> str:
        return repr(self)

    @classmethod
    def from_json(cls, dct: dict) -> 'SysctlOrder':
        return SysctlOrder(dct['op_type'], dct['op_args'])


class MessageOrder(Order, ReprableMixin):
    """
    A message order. Best executed with
    :meth:`smok.client.SMOKDevice._execute_message_order`
    """
    _REPR_FIELDS = 'uuid', 'times_retry'
    __slots__ = 'uuid', 'times_retry'

    def __init__(self, uuid: str):
        self.uuid = uuid
        self.times_retry = 3

    def __str__(self) -> str:
        return repr(self)

    def fail(self) -> bool:
        self.times_retry -= 1
        return bool(self.times_retry)

    @classmethod
    def from_json(cls, dct: dict) -> 'MessageOrder':
        return MessageOrder(dct['uuid'])


class WaitOrder(Order, ReprableMixin):
    """
    Order to wait a given amount of seconds

    :param period: seconds to wait
    """
    _REPR_FIELDS = ('period',)
    __slots__ = ('period',)

    def __init__(self, period: float):
        self.period = period

    @classmethod
    def from_json(cls, dct: dict) -> 'WaitOrder':
        return WaitOrder(dct['time'])

    def __str__(self) -> str:
        return repr(self)


class WriteOrder(Order, ReprableMixin):
    """
    Order to write a target value to target pathpoint

    :param pathpoint: name of pathpoint to write to
    :param value: value to write
    :param advise: advise level
    :param stale_after: optional timestamp in seconds, after which this write
        will be discarded
    """
    _REPR_FIELDS = 'pathpoint', 'value', 'advise', 'stale_after'
    __slots__ = 'pathpoint', 'value', 'advise', 'stale_after', 'repeat_count'

    def __init__(self, pathpoint: str, value: PathpointValueType, advise: AdviseLevel,
                 stale_after: tp.Optional[float] = None):
        self.pathpoint = pathpoint
        self.value = value
        self.advise = advise
        self.repeat_count = 10 if advise == AdviseLevel.FORCE else 1
        self.stale_after = stale_after

    def is_valid(self) -> bool:
        if self.stale_after is None:
            return True
        return self.stale_after > time.time()

    def __str__(self) -> str:
        return repr(self)

    def fail(self) -> bool:
        """
        Fail this order.

        Return whether to requeue it
        """
        self.repeat_count -= 1
        return bool(self.repeat_count)

    @classmethod
    def from_json(cls, dct: dict) -> 'WriteOrder':
        return WriteOrder(dct['path'], dct['value'], AdviseLevel(dct.get('advise', 0)),
                          dct.get('stale_after'))


class ReadOrder(Order, ReprableMixin):
    """
    An order to read a pathpoint

    :param pathpoint: pathpoint to read
    :param advise: advise level
    """
    _REPR_FIELDS = 'pathpoint', 'advise'

    def __init__(self, pathpoint: str, advise: AdviseLevel):
        self.pathpoint = pathpoint
        self.advise = advise
        self.repeat_count = 3 if AdviseLevel.ADVISE else 20

    def __str__(self) -> str:
        return repr(self)

    @classmethod
    def from_json(cls, dct: dict) -> 'ReadOrder':
        return ReadOrder(dct['path'], AdviseLevel(dct.get('advise', 0)))

    def fail(self) -> bool:
        """
        Fail this order.

        Return whether to requeue it
        """
        self.repeat_count -= 1
        return bool(self.repeat_count)


def orders_from_list(lst: tp.List[dict]) -> tp.List[Order]:
    orders = []
    for order in lst:
        try:
            order_type = order['type']
        except KeyError:
            logger.error('Received order (%s) without a type, ignoring', order)
            continue

        if order_type == 'message':
            o = MessageOrder.from_json(order)
        elif order_type == 'read':
            o = ReadOrder.from_json(order)
        elif order_type == 'sysctl':
            o = SysctlOrder.from_json(order)
        elif order_type == 'wait':
            o = WaitOrder.from_json(order)
        elif order_type == 'write':
            o = WriteOrder.from_json(order)
        else:
            logger.error('Received unknown order type of %s, ignoring', order_type)
            continue

        orders.append(o)
    return orders


class Disposition(enum.IntEnum):
    """
    A joinable quality of the section
    """
    JOINABLE = 0  #: this section can be joined to neighbouring JOINABLE sections
    CANNOT_JOIN = 1  #: this section cannot be joined to any sections


class Section(ReprableMixin):
    """
    A collection of orders.

    :param orders: a list of orders
    :param disposition: if Disposition.JOINABLE then this section can be joined with
        other sections. If Disposition.CANNOT_JOIN then all orders from this section
        will be executed before proceeding to next one

    :ivar future: a future that completes upon this section being finished.
        If this future is cancelled, and section did not start executing, it will be.
    """
    _REPR_FIELDS = 'orders', 'disposition'
    __slots__ = 'orders', 'disposition', 'future', 'cancelled'

    def __init__(self, orders: tp.List[Order] = None,
                 disposition: Disposition = Disposition.JOINABLE):

        self.future = FutureCollection([Future()])
        self.orders = orders or []
        self.disposition = disposition
        self.cancelled = False

    def __str__(self) -> str:
        return repr(self)

    def mark_as_done(self) -> None:
        """
        Should be invoked by your custom executor when executing this section completes.
        """
        self.future.set_result(None)

    def cancel(self) -> None:
        """
        Cancel the order
        """
        # we still need to execute the callbacks, as they might be holding an AMQP message
        # somewhere...
        self.cancelled = True

    def result(self, timeout: tp.Optional[float] = None):
        self.future.result(timeout)

    def __bool__(self) -> bool:
        return bool(self.orders)

    @classmethod
    def from_json(cls, dct: dict):
        return Section(orders_from_list(dct['orders']), Disposition(dct.get('disposition', 0)))

    def __iadd__(self, other: tp.Union[Order, 'Section', tp.Sequence['Section']]) -> 'Section':
        if isinstance(other, Order):
            self.orders.append(other)
        elif isinstance(other, tp.Sequence):
            self.orders.extend(other)
        else:
            self.orders.extend(other.orders)
            self.future += other.future
        return self

    def is_joinable(self) -> bool:
        return self.disposition == Disposition.JOINABLE

    def mark_as_being_executed(self) -> bool:
        """
        Mark this section as executed right now

        :return: whether is should be executed (False if cancelled)
        """
        self.future.set_running_or_notify_cancel()
        return not self.cancelled

    def max_wait(self) -> tp.Optional[float]:
        wait = None
        for order in (or_ for or_ in self.orders if isinstance(or_, WaitOrder)):
            if wait is None:
                wait = order.period
            else:
                if wait < order.period:
                    wait = order.period
        return wait


def sections_from_list(lst: tp.List[dict]) -> tp.List[Section]:
    return [Section.from_json(section) for section in lst]