c0fec0de/pyebus

View on GitHub
pyebus/msgdecoder.py

Summary

Maintainability
A
1 hr
Test Coverage
"""EBUS Message Decoding."""
import re

from .const import NA
from .exceptions import UnknownMsgError
from .msg import BrokenMsg, Field, Msg


class MsgDecoder:

    """
    Message Decoder.

    Args:
        msgdefs (MsgDefs): Message Definitions

    The message decoder takes a EBUSD data one-line string and creates the corresponding :any:`Msg` instance.
    The decoder needs to know the actual message definitions.
    """

    _re_decode = re.compile(r"([A-z0-9]+(\.[A-z0-9]+)?) ([^\s]*) (= )?(.*)")

    def __init__(self, msgdefs):
        self.msgdefs = msgdefs

    def decode_line(self, line):
        """
        Decode EBUSD data `line` and return :any:`Msg` instance.

        Raises:
            ValueError: if `line` does not match expected format.
            UnknownMsgError: if `line` is not covered by fields.
        """
        match = self._re_decode.match(line)
        if not match:
            raise ValueError(line)
        circuit, _, name, _, valuestr = match.groups()
        msgdef = self.msgdefs.get(circuit, name)
        if not msgdef:
            raise UnknownMsgError(f"circuit={circuit}, name={name}")
        return self.decode_value(msgdef, valuestr.strip())

    def decode_value(self, msgdef, valuestr):
        """
        Decode message `msgdef` value pair string `valuestr`.


        Returns:
            Msg: message with proper data.
            BrokenMsg: Undecodable message.
        """
        if not valuestr.startswith("ERR: "):
            fields = tuple(self._decodefields(msgdef, valuestr.strip()))
            return Msg(msgdef, fields)

        return BrokenMsg(msgdef, valuestr[len("ERR: ") :].strip())

    @staticmethod
    def _decodefields(msgdef, valuestr):
        if valuestr not in ("no data stored", "nosignal"):
            values = valuestr.split(";")
            fields = []
            for fielddef in msgdef.fields:
                if fielddef.idx is None:
                    continue
                try:
                    value = values[fielddef.idx].strip()
                except IndexError:
                    fieldvalue = NA
                else:
                    try:
                        fieldvalue = fielddef.type_.decode(value)
                    except ValueError:
                        fieldvalue = None
                fields.append(Field(fielddef, fieldvalue))
        else:
            fields = [Field(fielddef, NA) for fielddef in msgdef.fields]
        # virtual fields
        for virtfielddef in msgdef.virtfields:
            virtfieldvalue = virtfielddef.func(fields)
            fields.append(Field(virtfielddef, virtfieldvalue))
        return fields