smok-serwis/smok-client

View on GitHub
smok/sensor/sensor.py

Summary

Maintainability
A
0 mins
Test Coverage
F
33%
import logging
import typing as tp
import weakref

from satella.coding.transforms import merge_series
from satella.coding.typing import Number
from satella.time import time_ms

from smok.exceptions import OperationFailedError
from smok.pathpoint.orders import Section, ReadOrder, AdviseLevel
from smok.sensor.types import SensorValueType, get_type, SVTOrExcept
from .write_logs import SensorWriteEvent

logger = logging.getLogger(__name__)


def fqtsify(tag_name: tp.Union[str, tp.Set[str]]) -> str:
    """
    Standarize the name, for use in dictionaries and other places that access sensors by name

    :param tag_name: :term:`Tag name`, either a space-separated set of names or a set of names
        proper
    :return: :term:`FQTS`-ified name
    """
    if isinstance(tag_name, set):
        tag_name = list(tag_name)
    elif isinstance(tag_name, str):
        tag_name = tag_name.split(' ')
    tag_name.sort()
    return ' '.join(tag_name)


class Sensor:
    """
    A class representing a smok-client sensor defined on given device.

    Class is eq-able and hashable by fqts.

    .. warning:: Do not compare sensors across different devices!

    :ivar fqts: this sensor's :term:`FQTS` (str)
    :ivar path: composite pathpoint names, separated by a tilde (~) (str)
    :ivar type_name: name of the sensor type (str)
    :ivar type: object used for data conversion between pathpoints and sensor values
        (:class:`smok.sensor.types.BasicType`)
    """
    __slots__ = ('fqts', 'path', 'type_name', '_pathpoint_names', 'type', 'device',
                 'slave_pathpoints')

    def __eq__(self, other) -> bool:
        if not isinstance(other, Sensor):
            return False
        return self.fqts == other.fqts

    def __hash__(self) -> int:
        return hash(self.fqts)

    def __init__(self, device: 'SMOKDevice', fqts: str, path: str, type_name: str):
        self.device = weakref.proxy(device)
        self.fqts = fqtsify(fqts)
        self.path = path
        self.type_name = type_name
        self.slave_pathpoints = []
        self._pathpoint_names = []
        for pn in path.split('~'):
            self._pathpoint_names.append(pn)
            self.slave_pathpoints.append(self.device.get_pathpoint(pn))
        self.type = get_type(type_name)

    def get_archive(self,
                    starting_at: int,
                    stopping_at: tp.Optional[int] = None) -> tp.Iterator[
        tp.Tuple[int, SVTOrExcept]]:
        """
        Get archive readings.

        This will be readed entirely from the device's data, the server will not be queried

        :param starting_at: timestamp of start, in milliseconds
        :param stopping_at: timestamp of end, in milliseconds, or None for the end of the park
        :return: an iterator of tuple (timestamp in milliseconds,
                                       pathpoint value or exception instance)
        """
        archives = []
        for slave in self.slave_pathpoints:
            archives.append(slave.get_archive(starting_at, stopping_at))
        merged = merge_series(*archives)
        for ts, *values in merged:
            for v in values:
                if isinstance(v, OperationFailedError):
                    yield ts, v
                    break
            else:
                yield ts, self.type.pathpoint_to_sensor(*values)

    def log_write(self, who: str, reason: str, value: str, hr_value: tp.Optional[str] = None,
                  hr_sensor: tp.Optional[str] = None, timestamp: tp.Optional[int] = None) -> None:
        """
        Log that a write has taken place in this sensor

        :param who: who has made this change
        :param reason: reason for this write
        :param value: a str-able form of value
        :param hr_value: a human-readable value. Defaults to value
        :param hr_sensor: a human-readable name for the sensor. Defaults to fqts
        :param timestamp: in milliseconds! Leave at default for current timestamp
        """
        if hr_value is None:
            hr_value = value
        if hr_sensor is None:
            hr_sensor = self.fqts
        if timestamp is None:
            timestamp = time_ms()
        perm = SensorWriteEvent(fqts=self.fqts, hr_sensor=hr_sensor, hr_value=hr_value, value=value,
                                reason=reason, who=who, timestamp=timestamp)
        self.device.log_sensor_write(perm)

    def write(self, value, advise: AdviseLevel = AdviseLevel.ADVISE) -> Section:
        """
        Write a particular value to the sensor.

        Take care for the value to match the type of the sensor

        :param value: value to write
        :param advise: advise level to use
        :raises TypeError: invalid type
        """
        values = self.type.sensor_to_pathpoint(value, *(pp.name for pp in self.slave_pathpoints))
        section = Section()
        for value, pp in zip(values, self.slave_pathpoints):
            section += pp.write(value, advise)
        return section

    def get(self) -> tp.Tuple[Number, SensorValueType]:
        """
        Return the value of this sensor

        :raises NotReadedError: required pathpoints are not available
        :return: a tuple of (timestamp in milliseconds, sensor value)
        :raises OperationFailedError: one of pathpoint failed to provide a value
        """
        vals = [pp.get() for pp in self.slave_pathpoints]
        cur_ts = max(ts[0] for ts in vals)
        return cur_ts, self.type.pathpoint_to_sensor(*(val[1] for val in vals))

    def read(self, advise: AdviseLevel = AdviseLevel.ADVISE) -> Section:
        return Section([ReadOrder(y, advise) for y in self._pathpoint_names])

    @classmethod
    def from_json(cls, device: 'SMOKDevice', x: dict) -> 'Sensor':
        return Sensor(device, x['fqts'], x['path'], x['type'])