bparzella/secsgem

View on GitHub
secsgem/secs/handler.py

Summary

Maintainability
A
3 hrs
Test Coverage
A
100%
#####################################################################
# handler.py
#
# (c) Copyright 2013-2023, Benjamin Parzella. All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#####################################################################
"""Handler for SECS commands."""
from __future__ import annotations

import logging
import typing

import secsgem.common
import secsgem.hsms


class SecsHandler:  # pylint: disable=too-many-instance-attributes,too-many-public-methods
    """Baseclass for creating Host/Equipment models. This layer contains the SECS functionality.

    Inherit from this class and override required functions.
    """

    def __init__(self, settings: secsgem.common.Settings):
        """Initialize a secs handler.

        Args:
            settings: settings defining protocol and connection

        """
        self._settings = settings

        self._protocol = settings.create_protocol()
        self._protocol.events.message_received += self._on_message_received

        self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__)

        self._callback_handler = secsgem.common.CallbackHandler()
        self._callback_handler.target = self

    @property
    def settings(self) -> secsgem.common.Settings:
        """Get the setting object."""
        return self._settings

    @staticmethod
    def _generate_sf_callback_name(stream: int, function: int) -> str:
        return f"s{stream:02d}f{function:02d}"

    @property
    def protocol(self) -> secsgem.common.Protocol:
        """Get the connection for the handler."""
        return self._protocol

    def enable(self):
        """Enable the connection."""
        self.protocol.enable()

    def disable(self):
        """Disable the connection."""
        self.protocol.disable()

    def send_response(self, *args, **kwargs):
        """Wrapper for connections send_response function."""
        return self.protocol.send_response(*args, **kwargs)

    def send_and_waitfor_response(self, *args, **kwargs):
        """Wrapper for connections send_and_waitfor_response function."""
        return self.protocol.send_and_waitfor_response(*args, **kwargs)

    def send_stream_function(self, *args, **kwargs):
        """Wrapper for connections send_stream_function function."""
        return self.protocol.send_stream_function(*args, **kwargs)

    @property
    def events(self):
        """Wrapper for connections events."""
        return self.protocol.events

    @property
    def callbacks(self):
        """Property for callback handling."""
        return self._callback_handler

    def register_stream_function(self, stream: int, function: int, callback):
        """Register the function callback for stream and function.

        :param stream: stream to register callback for
        :type stream: integer
        :param function: function to register callback for
        :type function: integer
        :param callback: method to call when stream and functions is received
        :type callback: def callback(connection)
        """
        name = self._generate_sf_callback_name(stream, function)
        setattr(self._callback_handler, name, callback)

    def unregister_stream_function(self, stream, function):
        """Unregister the function callback for stream and function.

        :param stream: stream to unregister callback for
        :type stream: integer
        :param function: function to register callback for
        :type function: integer
        """
        name = self._generate_sf_callback_name(stream, function)
        setattr(self._callback_handler, name, None)

    def _handle_stream_function(self, message):
        sf_callback_index = self._generate_sf_callback_name(message.header.stream, message.header.function)

        # return S09F05 if no callback present
        if sf_callback_index not in self._callback_handler:
            self.logger.warning("unexpected function received %s\n%s", sf_callback_index, message.header)
            if message.header.require_response:
                self.send_response(self.stream_function(9, 5)(message.header.encode()), message.header.system)

            return

        try:
            callback = getattr(self._callback_handler, sf_callback_index)
            result = callback(self, message)
            if result is not None:
                self.send_response(result, message.header.system)
        except Exception:  # pylint: disable=broad-except
            self.logger.exception("Callback aborted because of exception, abort sent")
            self.send_response(self.stream_function(message.header.stream, 0)(), message.header.system)

    def _on_message_received(self, data: dict[str, typing.Any]):
        """Message received from protocol layer.

        Args:
            data: received data

        """
        message = data["message"]

        # check if callbacks available for this stream and function
        self._handle_stream_function(message)

    def disable_ceids(self):
        """Disable all Collection Events."""
        self.logger.info("Disable all collection events")

        return self.send_and_waitfor_response(self.stream_function(2, 37)({"CEED": False, "CEID": []}))

    def disable_ceid_reports(self):
        """Disable all Collection Event Reports."""
        self.logger.info("Disable all collection event reports")

        return self.send_and_waitfor_response(self.stream_function(2, 33)({"DATAID": 0, "DATA": []}))

    def list_svs(self, svs=None):
        """Get list of available Service Variables.

        :returns: available Service Variables
        :rtype: list
        """
        self.logger.info("Get list of service variables")

        if svs is None:
            svs = []

        message = self.send_and_waitfor_response(self.stream_function(1, 11)(svs))

        return self.settings.streams_functions.decode(message)

    def request_svs(self, svs):
        """Request contents of supplied Service Variables.

        :param svs: Service Variables to request
        :type svs: list
        :returns: values of requested Service Variables
        :rtype: list
        """
        self.logger.info("Get value of service variables %s", svs)

        message = self.send_and_waitfor_response(self.stream_function(1, 3)(svs))

        return self.settings.streams_functions.decode(message)

    def request_sv(self, sv_id):
        """Request contents of one Service Variable.

        :param sv_id: id of Service Variable
        :type sv_id: int
        :returns: value of requested Service Variable
        :rtype: various
        """
        self.logger.info("Get value of service variable %s", sv_id)

        return self.request_svs([sv_id])[0]

    def list_ecs(self, ecs=None):
        """Get list of available Equipment Constants.

        :returns: available Equipment Constants
        :rtype: list
        """
        self.logger.info("Get list of equipment constants")

        if ecs is None:
            ecs = []
        message = self.send_and_waitfor_response(self.stream_function(2, 29)(ecs))

        return self.settings.streams_functions.decode(message)

    def request_ecs(self, ecs):
        """Request contents of supplied Equipment Constants.

        :param ecs: Equipment Constants to request
        :type ecs: list
        :returns: values of requested Equipment Constants
        :rtype: list
        """
        self.logger.info("Get value of equipment constants %s", ecs)

        message = self.send_and_waitfor_response(self.stream_function(2, 13)(ecs))

        return self.settings.streams_functions.decode(message)

    def request_ec(self, ec_id):
        """Request contents of one Equipment Constant.

        :param ec_id: id of Equipment Constant
        :type ec_id: int
        :returns: value of requested Equipment Constant
        :rtype: various
        """
        self.logger.info("Get value of equipment constant %s", ec_id)

        return self.request_ecs([ec_id])

    def set_ecs(self, ecs):
        """Set contents of supplied Equipment Constants.

        :param ecs: list containing list of id / value pairs
        :type ecs: list
        """
        self.logger.info("Set value of equipment constants %s", ecs)

        message = self.send_and_waitfor_response(self.stream_function(2, 15)(ecs))

        return self.settings.streams_functions.decode(message).get()

    def set_ec(self, ec_id, value):
        """Set contents of one Equipment Constant.

        :param ec_id: id of Equipment Constant
        :type ec_id: int
        :param value: new content of Equipment Constant
        :type value: various
        """
        self.logger.info("Set value of equipment constant %s to %s", ec_id, value)

        return self.set_ecs([[ec_id, value]])

    def send_equipment_terminal(self, terminal_id, text):
        """Set text to equipment terminal.

        :param terminal_id: ID of terminal
        :type terminal_id: int
        :param text: text to send
        :type text: string
        """
        self.logger.info("Send text to terminal %s", terminal_id)

        return self.send_and_waitfor_response(self.stream_function(10, 3)({"TID": terminal_id, "TEXT": text}))

    def are_you_there(self):
        """Check if remote is still replying."""
        self.logger.info("Requesting 'are you there'")

        return self.send_and_waitfor_response(self.stream_function(1, 1)())

    def stream_function(self, stream: int, function: int) -> type[secsgem.secs.SecsStreamFunction]:
        """Get class for stream and function.

        Args:
            stream: stream to get class for
            function: function to get class for

        Returns:
            class for function

        """
        klass = self.settings.streams_functions.function(stream, function)

        if klass is None:
            raise KeyError(f"Undefined function requested: S{stream:02d}F{function:02d}")

        return klass