pyebus/msgdefdecoder.py
"""
Message Definition Decoder.
EBUS message definitions are specified at
https://github.com/john30/ebusd/wiki/4.1.-Message-definition#message-definition .
The function :any:`decode_msgdef` converts a EBUS message defintion string into a :any:`MsgDef`.
"""
import collections
import re
from .msgdef import FieldDef, MsgDef
from .typedecoder import decode_type
from .types import EnumType
from .virtfielddef import iter_virtfielddefs
def decode_msgdef(line):
"""
Decode Message and Field Definition retrieved from ebusd.
The EBUSD command `find -a -F type,circuit,name,fields` retrieves
all message and field definitions of all known and connected devices.
The resulting lines are decoded by this method and create a proper
:any:`MsgDef` instance per `line`.
>>> m = decode_msgdef('r,mc.4,OtShutdownLimit,temp,s,UCH,,°C,"text, text"')
>>> m.circuit, m.name, m.read, m.prio, m.write, m.update
('mc.4', 'OtShutdownLimit', True, None, False, False)
>>> m.children
(FieldDef(0, 'temp', IntType(0, 254), unit='°C', comment='text, text'),)
>>> m = decode_msgdef('w,ui,TempIncrease,temp,m,D2C,,°C,Temperatur')
>>> m.circuit, m.name, m.read, m.prio, m.write, m.update
('ui', 'TempIncrease', False, None, True, False)
>>> m.children
(FieldDef(0, 'temp', IntType(-2047.9, 2047.9, divider=16), unit='°C', comment='Temperatur'),)
"""
try:
values = _split(line)
# Workaround for definitions of internal messages like
# w,broadcast,id,
if len(values) == 4 and values[3] == "":
values = values[:3]
type_, circuit, name = values[:3] # pylint: disable=W0632
read, prio, write, update = decodetype(type_)
children = _decodefields(values[3:])
except ValueError:
raise ValueError(f"Invalid message definition {line!r}") from None
for child in iter_virtfielddefs(children):
children.append(child)
return MsgDef(circuit, name, tuple(children), read, prio, write, update)
def _split(line):
values = []
regex = re.compile(r'"(([^"]|"")+)"(,|$)|([^\,]*)(,|$)')
for mat in regex.finditer(line):
groups = mat.groups()
values.append(groups[0].replace('""', '"') if groups[0] else groups[3])
# we don't want the empty match at the end of line if we already
# consumed a non-empty entry after the last colon
if (groups[2] or groups[4] or "") == "":
break
return values
def decodetype(type_):
"""
Decode Type.
>>> decodetype('r')
(True, None, False, False)
>>> decodetype('w')
(False, None, True, False)
>>> decodetype('u')
(False, None, False, True)
"""
reg = re.compile(r"(r)([1-9]?)")
mat = reg.match(type_)
if mat:
read = mat.group(1) is not None
prio = int(mat.group(2)) if mat.group(2) else None
else:
read, prio = False, None
write = "w" in type_
update = not read and len(type_) > (1 if write else 0)
return read, prio, write, update
def _decodefields(values):
if len(values) % 6 in (0, 3, 4, 5):
chunks = _chunks(values, 6)
return list(_createfields(chunks))
raise ValueError()
def _createfields(chunks):
if chunks:
# determine duplicate names
fields = []
dups = collections.defaultdict(lambda: -1)
for chunk in chunks:
name, _, datatype = chunk[:3]
if not datatype.startswith("IGN"):
fields.append(chunk)
dups[name] += 1
# create fields
cnts = collections.defaultdict(lambda: 0)
for idx, field in enumerate(fields):
name = field[0]
if dups[name]:
cnt = cnts[name]
cnts[name] = cnt + 1
name = f"{name}.{cnt}"
yield _createfield(idx, name, *field)
def _createfield(idx, name, _, __, datatype, dividervalues=None, unit=None, comment=None):
if dividervalues and "=" in dividervalues:
type_ = EnumType(tuple(pair.split("=", 1)[1] for pair in dividervalues.split(";")))
else:
ebustype = datatype.split(",")[0]
if dividervalues:
divider = float(dividervalues)
if divider < 0:
divider = 1 / -divider
else:
divider = None
type_ = decode_type(ebustype, divider)
return FieldDef(idx, name, type_, unit, comment)
def _chunks(list_or_tuple, maxsize):
return [list_or_tuple[i : i + maxsize] for i in range(0, len(list_or_tuple), maxsize)]