pyebus/ebus.py
"""Pythonic EBUS Representation."""
import asyncio
import collections
import logging
from .circuitinfodecoder import decode_circuitinfos
from .connection import CommandError, Connection
from .const import DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT, OK
from .exceptions import UnknownMsgError
from .msg import BrokenMsg, filter_msg
from .msgdecoder import MsgDecoder
from .msgdef import resolve_prio
from .msgdefdecoder import decode_msgdef
from .msgdefs import MsgDefs
from .util import repr_
_LOGGER = logging.getLogger(__name__)
_CMD_FINDMSGDEFS = "find -a -F type,circuit,name,fields"
class Ebus:
"""
Pythonic EBUS Representation.
The EBUS handle, using one :any:`Connection` to an EBUSD instance.
One EBUSD server can handle multiple :any:`Ebus` instances.
Keyword Args:
host (str): EBUSD host
port (int): EBUSD port
timeout (int): Connection Timeout on connect and write.
scaninterval (str): EBUSD scan - check interval
scans (str): EBUSD scan - number of intervals
circuitinfos (list): List with :any:`CircuitInfo` instances
msgdefcodes (list): EBUSD Message Definition Codes
msgdefs (MsgDefs): Message Definitions
"""
# pylint: disable=R0902
__slots__ = (
"connection",
"scaninterval",
"scans",
"msgdefcodes",
"_msgdecoder",
"_circuitinfos",
"_circuitinfomap",
)
CONNECTOR = Connection
DEFAULT_SCANINTERVAL = 10
DEFAULT_SCANS = 3
# pylint: disable=too-many-arguments
def __init__(
self,
host=DEFAULT_HOST,
port=DEFAULT_PORT,
timeout=DEFAULT_TIMEOUT,
scaninterval=None,
scans=None,
circuitinfos=None,
msgdefcodes=None,
msgdefs=None,
):
self._circuitinfomap = {}
self.connection = self.CONNECTOR(host=host, port=port, autoconnect=True, timeout=timeout)
self.scaninterval = scaninterval or self.DEFAULT_SCANINTERVAL
self.scans = scans or self.DEFAULT_SCANS
self.msgdefcodes = msgdefcodes or []
self._msgdecoder = MsgDecoder(msgdefs or MsgDefs())
self.circuitinfos = circuitinfos or []
_LOGGER.info(repr(self))
def __repr__(self):
return repr_(
self,
kwargs=(
("host", self.host, DEFAULT_HOST),
("port", self.port, DEFAULT_PORT),
("timeout", self.timeout, DEFAULT_TIMEOUT),
("scaninterval", self.scaninterval, self.DEFAULT_SCANINTERVAL),
("scans", self.scans, self.DEFAULT_SCANS),
),
)
@property
def ident(self):
"""Ident."""
return f"{self.host}:{self.port}"
@property
def host(self):
"""Host Name or IP."""
return self.connection.host
@property
def port(self):
"""Port."""
return self.connection.port
@property
def timeout(self):
"""Timeout."""
return self.connection.timeout
@property
def circuitinfos(self):
"""
Circuit Informations :any:`CircuitInfo`.
This property is writeable.
"""
return self._circuitinfos
@circuitinfos.setter
def circuitinfos(self, circuitinfos):
self._circuitinfos = tuple(circuitinfos)
self._circuitinfomap = dict((circuitinfo.circuit, circuitinfo) for circuitinfo in self._circuitinfos)
def get_circuitinfo(self, circuit):
"""Return :any:`CircuitInfo` for `circuit`."""
try:
return self._circuitinfomap[circuit]
except KeyError:
return None
@property
def msgdefs(self):
"""
Message Defintions :any:`MsgDefs`.
This property is writeable.."""
return self._msgdecoder.msgdefs
@msgdefs.setter
def msgdefs(self, msgdefs):
self._msgdecoder.msgdefs = msgdefs
def __copy__(self):
return Ebus(
self.host,
self.port,
timeout=self.timeout,
scaninterval=self.scaninterval,
scans=self.scans,
circuitinfos=self.circuitinfos,
msgdefcodes=self.msgdefcodes,
msgdefs=self.msgdefs,
)
async def async_wait_scancompleted(self):
"""
Wait until EBUSD device scan is completed.
EBUSD scans the bus infrastructure at startup be default.
Devices and messages are detected during this time.
This method waits until no new message are found.
It checks every `scaninterval` seconds.
The number of messages has to be stable for `scans` times.
Raises:
ConnectionRefusedError: If connection cannot be established
ConnectionError: On connection breakdown.
CommandError: If command failed
Shutdown: On EBUSD shutdown.
"""
cnts = []
while True:
await self.connection.async_request(_CMD_FINDMSGDEFS)
# pylint: disable=consider-using-generator
cnt = sum([1 async for line in self.connection.async_read()])
cnts.append(cnt)
if len(cnts) < self.scans or not all(cnt == cnts[-1] for cnt in cnts[-self.scans : -1]):
await asyncio.sleep(self.scaninterval)
else: # pragma: no cover
# not properly collected during coverage analysis
break
async def async_load_msgdefs(self):
"""
Load Message Definitions from EBUSD.
Alias for :any:`async_load_msgdefcodes` and :any:`decode_msgdefcodes`.
Raises:
ConnectionRefusedError: If connection cannot be established
ConnectionError: On connection breakdown.
CommandError: If command failed
Shutdown: On EBUSD shutdown.
"""
await self.async_load_msgdefcodes()
self.decode_msgdefcodes()
async def async_load_msgdefcodes(self, add=False):
"""
Load EBUS Message Definition Codes from EBUSD and store to :any:`msgdefcodes`.
Raises:
ConnectionRefusedError: If connection cannot be established
ConnectionError: On connection breakdown.
CommandError: If command failed
Shutdown: On EBUSD shutdown.
"""
_LOGGER.info("load_msgdefcodes()")
if add:
msgdefcodes = self.msgdefcodes
else:
msgdefcodes = self.msgdefcodes = []
await self.connection.async_request(_CMD_FINDMSGDEFS)
async for line in self.connection.async_read():
line = line.strip()
try:
msgdef = decode_msgdef(line)
except ValueError as exc:
_LOGGER.warning("Cannot decode message definition %r (%s)", line, exc)
else:
if msgdef and not msgdef.circuit.startswith("scan") and line not in msgdefcodes:
msgdefcodes.append(line)
def decode_msgdefcodes(self):
"""Decode `msgdefcodes` and use as `msgdefs`."""
_LOGGER.info("decode_msgdefcodes()")
# Decode
msgdefs = []
for msgdefcode in self.msgdefcodes:
try:
msgdefs.append(decode_msgdef(msgdefcode))
except ValueError as exc:
_LOGGER.warning("Cannot decode message definition %r (%s)", msgdefcode, exc)
# Sort
self.msgdefs.clear()
for msgdef in sorted(msgdefs, key=lambda msgdef: (msgdef.circuit, msgdef.name)):
self.msgdefs.add(msgdef)
async def async_read(self, msgdef, ttl=None, setprio=None):
"""
Read Message.
Args:
msgdef (MsgDef): Message Definition
Keyword Args:
ttl (int): Time-to-live. Maximum age of read value in seconds.
setprio: Priority `1-9` or `A` for automatic.
Returns:
Msg: Message
Raises:
ValueError: on decoder error
ConnectionRefusedError: If connection cannot be established
ConnectionError: On connection breakdown.
CommandError: If command failed
Shutdown: On EBUSD shutdown.
"""
_LOGGER.info("read(%r, ttl=%r), setprio=%r", msgdef, ttl, setprio)
if setprio:
msgdef = msgdef.replace(setprio=resolve_prio(msgdef, setprio))
return await self._async_read(msgdef, ttl)
async def async_write(self, msgdef, value, ttl=0):
"""
Write Message.
If `msgdef` just contains a subset of fields, the `value` is applied only to these, by
running a read-modify-write operation.
Args:
msgdef (MsgDef): Message Definition
Keyword Args:
ttl (int): Time-to-live. Maximum age of read value in seconds.
Just needed in case of a Read-Modify-Write.
Raises:
ConnectionRefusedError: If connection cannot be established
ConnectionError: On connection breakdown.
"""
_LOGGER.info("write(%r, value=%r, ttl=%r)", msgdef, value, ttl)
if not msgdef.write:
raise ValueError(f"Message is not writeable {msgdef}")
fullmsgdef = self.msgdefs.get(msgdef.circuit, msgdef.name)
if len(fullmsgdef.children) != len(msgdef.children):
# Read
if not msgdef.read:
raise ValueError(f"Message is not read-modify-writable {msgdef}")
await self.connection.async_request("read", msgdef.name, c=msgdef.circuit, m=ttl)
line = await self.connection.async_readresp(check=False)
values = line.split(";")
# Modify
for fielddef in msgdef.fields:
encvalue = fielddef.type_.encode(value)
values[fielddef.idx] = str(encvalue)
else:
values = [str(fielddef.type_.encode(value)) for fielddef in msgdef.fields]
# Write
await self.connection.async_request("write", msgdef.name, ";".join(values), c=msgdef.circuit)
resp = await self.connection.async_readresp()
if resp != "done":
raise CommandError(resp)
async def async_listen(self, msgdefs=None):
"""
Listen to EBUS for messages.
Listen to automatically updated messages and EBUSDs polling mechanism.
Keyword Args:
msgdefs (MsgDefs): Message definitions to be listened, other messages are ignored.
Yields:
Msg: Messages
Raises:
ConnectionRefusedError: If connection cannot be established
ConnectionError: On connection breakdown.
CommandError: If command failed
Shutdown: On EBUSD shutdown.
"""
_LOGGER.info("listen(msgdefs=%r)", msgdefs)
async for msg in self._async_listen(msgdefs):
yield msg
async def async_observe(self, msgdefs=None, ttl=None, setprio=None):
"""
Observe `msgdefs` messages.
Explicitly read all messages and then listen to automatically updated messages and
EBUSDs polling mechanism.
Keyword Args:
msgdefs (MsgDefs): Message definitions to be observed, other messages are ignored.
ttl (int): Time-to-live. Maximum age of read value in seconds.
setprio: Priority `1-9` or `A` for automatic.
Yields:
Msg: Message
Raises:
ConnectionRefusedError: If connection cannot be established
ConnectionError: On connection breakdown.
CommandError: If command failed
Shutdown: On EBUSD shutdown.
"""
_LOGGER.info("observe(msgdefs=%r, ttl=%r, setprio=%r)", msgdefs, ttl, setprio)
msgdefs = msgdefs or self.msgdefs
data = collections.defaultdict(lambda: None)
# read all
for msgdef in msgdefs:
if msgdef.read:
if setprio:
msgdef = msgdef.replace(setprio=resolve_prio(msgdef, setprio))
msg = await self._async_read(msgdef, ttl=ttl)
_LOGGER.debug("observe-read: %r", msg)
msg = filter_msg(msg, msgdefs)
if msg:
if msg.valid:
data[msgdef.ident] = msg
yield msg
elif msgdef.update:
data[msgdef.ident] = None
# find new values (which got updated while we where reading)
await self.connection.async_request("find -d")
async for line in self.connection.async_read(check=False):
msg = self._decode_msg(line)
_LOGGER.debug("observe-find: %r", msg)
msg = filter_msg(msg, msgdefs)
if msg and msg != data[msg.msgdef.ident]:
yield msg
data[msg.msgdef.ident] = msg
# listen
async for msg in self._async_listen(msgdefs):
_LOGGER.debug("observe-listen: %r", msg)
yield msg
async def async_get_state(self):
"""
Return state string.
This method does **NOT** raise any exception.
Returns:
str: OK or any error value.
"""
_LOGGER.info("get_state()")
return await self._async_get_state()
async def _async_get_state(self):
try:
await self.connection.async_request("state")
state = await self.connection.async_readresp(check=False)
if state.startswith("signal acquired"):
return OK
return state
except (ConnectionError, CommandError, ConnectionRefusedError):
return "no ebusd connection"
async def async_is_online(self):
"""
Return `True` if everything is fine.
This method does **NOT** raise any exception.
Returns:
bool
"""
_LOGGER.info("is_online()")
state = await self._async_get_state()
return state == OK
async def async_get_info(self):
"""
Return EBUSD meta information.
Returns
dict: Meta information according to metainfo_
.. _metainfo: https://github.com/john30/ebusd/wiki/3.1.-TCP-client-commands#info
"""
_LOGGER.info("get_info()")
info = {}
await self.connection.async_request("info")
async for line in self.connection.async_read():
name, value = line.split(":", 1)
info[name.strip()] = value.strip()
return info
async def async_load_circuitinfos(self):
"""Load EBUSD Circuit Information and store in :any:`circuitinfos`."""
_LOGGER.info("load_circuitinfos()")
await self.connection.async_request("info")
lines = [line async for line in self.connection.async_read()]
self.circuitinfos = decode_circuitinfos(lines)
async def async_cmd(self, cmd, infinite=False, check=False):
"""
Send EBUS_Command_ `cmd` to EBUSD and Receive Response.
.. _EBUS_Command: https://github.com/john30/ebusd/wiki/3.1.-TCP-client-commands
Args:
cmd (str): Commmand String
Keyword Args:
infinite (bool): Do not stop at first empty line.
check (bool): Abort on `ERR:` string in response.
Yields:
str: response
Raises:
ConnectionRefusedError: If connection cannot be established
ConnectionError: On connection breakdown.
CommandError: If command failed and `check==True`.
Shutdown: On EBUSD shutdown.
"""
_LOGGER.info(f"cmd({cmd!r}, infinite=%r, check=%r)", infinite, check)
await self.connection.async_write(cmd)
async for line in self.connection.async_read(infinite=infinite, check=check):
yield line
async def _async_read(self, msgdef, ttl=None):
try:
await self.connection.async_request("read", msgdef.name, c=msgdef.circuit, p=msgdef.setprio, m=ttl)
line = await self.connection.async_readresp(check=False)
except CommandError as exc: # pragma: no cover
return BrokenMsg(msgdef, str(exc))
return self._msgdecoder.decode_value(msgdef, line)
async def _async_listen(self, msgdefs):
await self.connection.async_request("listen")
resp = await self.connection.async_readresp()
if resp != "listen started":
raise CommandError(f"Listen could not be started: {resp}")
async for line in self.connection.async_read(check=False):
msg = self._decode_msg(line)
msg = filter_msg(msg, msgdefs)
if msg:
yield msg
def _decode_msg(self, line):
if line:
try:
return self._msgdecoder.decode_line(line)
except UnknownMsgError:
pass
except ValueError as exc: # pragma: no cover
_LOGGER.warning("Cannot decode message in %r: %s", line, exc)
return None