c0fec0de/pyebus

View on GitHub
pyebus/msgdef.py

Summary

Maintainability
A
3 hrs
Test Coverage
"""Message Defintions."""
import collections
import copy

from anytree import NodeMixin

from .const import AUTO
from .util import repr_

_MsgDef = collections.namedtuple("_MsgDef", "circuit name read prio write update setprio")


class MsgDef(_MsgDef, NodeMixin):

    """
    Message Definition.

    A message is defined by `circuit` and `name` and has a fields

    Args:
        circuit (str): Circuit Name
        name (str): Message Name
        children (tuple): Field definitions. :any:`FieldDef` and :any:`VirtFieldDef`.

    Keyword Args:
        read (bool): Message intend to be read
        prio (int): Message Polling Priority on bus.
        write (bool): Message intend to be written
        updated (bool): Message intent to be seen automatically on every value change
        setprio: Message polling priority to be set. Integer `1-9 or `A` for automatic.

    >>> from pyebus import MsgDef, types
    >>> m = MsgDef('circuit', 'name', [
    ...     FieldDef(0, 'temp', types.IntType(-127, 128)),
    ...     FieldDef(1, 'name', types.StrType(10)),
    ...     VirtFieldDef('virt', types.StrType(), lambda msg: f'{msg.name}: {msg.temp}'),
    ... ], read=True)
    >>> m  # doctest: +ELLIPSIS
    MsgDef('circuit', 'name', (FieldDef(0, 'temp', IntType(... VirtFieldDef('virt', StrType())), read=True)
    >>> m.ident
    'circuit/name'
    >>> m.children
    (FieldDef(0, 'temp', IntType(-127, 128)), FieldDef(1, 'name', StrType(length=10)), VirtFieldDef('virt', StrType()))
    >>> m.fields
    (FieldDef(0, 'temp', IntType(-127, 128)), FieldDef(1, 'name', StrType(length=10)))
    >>> m.virtfields
    (VirtFieldDef('virt', StrType()),)
    >>> m.access
    'r----'

    :any:`MsgDef`, :any:`FieldDef` and :any:`VirtFieldDef` form a tree structure

    >>> from anytree import RenderTree
    >>> print(RenderTree(m))  # doctest: +ELLIPSIS
    MsgDef('circuit', 'name', (FieldDef(0, 'temp', IntType(-127, 128)),..., StrType())), read=True)
    ├── FieldDef(0, 'temp', IntType(-127, 128))
    ├── FieldDef(1, 'name', StrType(length=10))
    └── VirtFieldDef('virt', StrType())

    Similar object can be easily created by:

    >>> m.replace(children=m.fields[1:2])
    MsgDef('circuit', 'name', (FieldDef(1, 'name', StrType(length=10)),), read=True)
    """

    __slots__ = tuple()

    def __new__(cls, circuit, name, children, read=False, prio=None, write=False, update=False, setprio=None):
        if not read:
            prio = None
        msgdef = _MsgDef.__new__(cls, circuit, name, read, prio, write, update, setprio)
        if children:
            msgdef.children = children
        return msgdef

    def __repr__(self):
        args = (self.circuit, self.name, self.children)
        kwargs = [
            ("read", self.read, False),
            ("prio", self.prio, None),
            ("write", self.write, False),
            ("update", self.update, False),
            ("setprio", self.setprio, None),
        ]
        return repr_(self, args, kwargs)

    def __hash__(self):
        return hash(
            (self.circuit, self.name, self.children, self.read, self.prio, self.write, self.update, self.setprio)
        )

    def __eq__(self, other):
        if self.__class__ is other.__class__:
            return (
                self.circuit,
                self.name,
                self.children,
                self.read,
                self.prio,
                self.write,
                self.update,
                self.setprio,
            ) == (
                other.circuit,
                other.name,
                other.children,
                other.read,
                other.prio,
                other.write,
                other.update,
                other.setprio,
            )

        return NotImplemented

    @property
    def fields(self):
        """Fields."""
        return tuple(child for child in self.children if isinstance(child, FieldDef))

    @property
    def virtfields(self):
        """Calulated Fields."""
        return tuple(child for child in self.children if isinstance(child, VirtFieldDef))

    @property
    def ident(self):
        """Identifier."""
        return f"{self.circuit}/{self.name}"

    @property
    def access(self):
        """Message Access."""
        read = "r" if self.read else "-"
        write = "w" if self.write else "-"
        update = "u" if self.update else "-"
        setprio = str(self.setprio) if self.setprio else "-"
        prio = str(self.prio) if self.prio else "-"
        return "".join((read, write, update, setprio, prio))

    def join(self, msgdef):
        """Return Joined Message Definition."""
        if (self.circuit, self.name, self.children) == (msgdef.circuit, msgdef.name, msgdef.children):
            return self.replace(
                read=self.read or msgdef.read,
                prio=self.prio or msgdef.prio,
                write=self.write or msgdef.write,
                update=self.update or msgdef.update,
                setprio=self.setprio or msgdef.setprio,
            )

        return None

    def replace(self, **kwargs):
        """Create copy with updated attributes."""
        attrs = self._asdict()
        attrs["children"] = self.children
        attrs.update(kwargs)
        # Take a copy of all children, to ensure proper tree relation
        attrs["children"] = tuple(copy.copy(fielddef) for fielddef in attrs["children"])
        return MsgDef(**attrs)


_FieldDef = collections.namedtuple("_FieldDef", "idx name type_ unit comment")


class AbstractFieldDef(_FieldDef, NodeMixin):

    """
    Abstract Field Definition.

    Args:
        idx (str): Index within Message
        name (str): Unique name (as `name` may be used multiple times by ebus)
        type_ (Type): Type

    Keyword Args:
        unit (str): Unit of the field value
        comment (str): Comment.
    """

    __slots__ = tuple()

    def __new__(cls, idx, name, type_, unit=None, comment=None):
        return _FieldDef.__new__(cls, idx, name, type_, unit or None, comment or None)

    def __repr__(self):
        args = (self.idx, self.name, self.type_)
        kwargs = [
            ("unit", self.unit, None),
            ("comment", self.comment, None),
        ]
        return repr_(self, args, kwargs)

    def _pre_detach(self, parent):
        # it is forbidden to remove fields from their message - create new one
        assert False, f"{self!r} is already used by {parent!r}"  # pragma: no cover

    @property
    def msgdef(self):
        """Message Definition."""
        return self.parent

    @property
    def ident(self):
        """Identifier."""
        return f"{self.msgdef.ident}/{self.name}" if self.msgdef else None

    def __copy__(self):
        return FieldDef(idx=self.idx, name=self.name, type_=self.type_, unit=self.unit, comment=self.comment)


class FieldDef(AbstractFieldDef):

    """
    Field Definition.

    Args:
        idx (str): Index within Message
        name (str): Unique name (as `name` may be used multiple times by ebus)
        type_ (Type): Type

    Keywords Args:
        unit (str): Unit of the field value
        comment (str): Comment.
    """


class VirtFieldDef(AbstractFieldDef):
    """
    Virtual Field Definition.

    Args:
        name (str): Unique name (as `name` may be used multiple times by ebus)
        type_ (Type): Type
        func: Function to create value.

    Keywords Args:
        unit (str): Unit of the field value
        comment (str): Comment.
    """

    def __new__(cls, name, type_, func, unit=None, comment=None):
        obj = AbstractFieldDef.__new__(cls, None, name, type_, unit or None, comment or None)
        obj.func = func
        return obj

    def __repr__(self):
        return repr_(self, (self.name, self.type_))

    def __copy__(self):
        # pylint: disable=E1101
        return VirtFieldDef(name=self.name, type_=self.type_, func=self.func, unit=self.unit, comment=self.comment)


def resolve_prio(msgdef, setprio=AUTO):
    """
    Resolve priority specification.

    Integer values are just passed through.

    >>> resolve_prio(MsgDef('circuit', 'name', [], read=True), 1)
    1
    >>> resolve_prio(MsgDef('circuit', 'name', [], read=True), 9)
    9

    The AUTO option, sets read-only messages to prio 1.

    >>> resolve_prio(MsgDef('circuit', 'name', [], read=True), AUTO)
    1

    All other readable messages get prio 2

    >>> resolve_prio(MsgDef('circuit', 'name', [], read=True, write=True), AUTO)
    2
    >>> resolve_prio(MsgDef('circuit', 'name', [], read=True, update=True), AUTO)
    2

    Non-readable messages have no prio.

    >>> resolve_prio(MsgDef('circuit', 'name', [], write=True), AUTO)
    """
    if msgdef.read:
        if setprio == AUTO:
            # Prio is only available on read messages.
            if msgdef.update or msgdef.write:
                # we just want to ensure that we do not miss any value
                setprio = 2
            else:
                setprio = 1
        return setprio

    return None