odin-detector/odin-data

View on GitHub
python/src/odin_data/control/odin_data_controller.py

Summary

Maintainability
D
2 days
Test Coverage
"""
Created on 30th November 2023

:author: Alan Greer
"""

import logging
import threading
import time

from deepdiff import DeepDiff
from odin.adapters.parameter_tree import ParameterTree

from odin_data.control.ipc_tornado_client import IpcTornadoClient


class OdinDataController(object):
    def __init__(self, name, endpoints, update_interval=0.5):
        self._clients = []
        self._client_connections = []
        self._update_interval = update_interval
        self._name = name
        self._api = 0.1
        self._error = ""
        self._endpoints = []
        self._config_cache = None

        for arg in endpoints.split(","):
            arg = arg.strip()
            logging.debug("Endpoint: %s", arg)
            ep = {"ip_address": arg.split(":")[0], "port": int(arg.split(":")[1])}
            self._endpoints.append(ep)

        for ep in self._endpoints:
            logging.debug("Creating client {}:{}".format(ep["ip_address"], ep["port"]))
            self._clients.append(IpcTornadoClient(ep["ip_address"], ep["port"]))
            self._client_connections.append(False)

        # set up controller specific parameters
        self.setup_parameter_tree()

        # TODO: Consider renaming this
        self._params = ParameterTree(self._tree, mutable=True)

        # Create the status loop handling thread
        self._status_running = True
        self._status_lock = threading.Lock()
        self._status_thread = threading.Thread(target=self.update_loop)
        self._status_thread.start()

    def setup_parameter_tree(self):
        self._tree = {
            "api": (lambda: self._api, None, {}),
            "module": (lambda: self._name, None, {}),
            "endpoints": [],
            "count": (lambda: len(self._clients), None, {}),
            "update_interval": (lambda: self._update_interval, None, {}),
        }
        for idx, endpoint in enumerate(self._endpoints):
            self._tree["endpoints"].append(
                # Note the default here binds unique variables into each closure
                {k: (lambda v=v: v, None, {}) for k, v in endpoint.items()}
            )
        for idx, _client in enumerate(self._clients):
            self._tree[str(idx)] = {
                "status": {"error": (lambda: self._error, None, {})},
                "config": {},
            }

    def merge_external_tree(self, path, tree):
        # First we need to insert the new parameter tree
        self._tree[path] = tree
        # Next, we must re-build the complete parameter tree
        self._params = ParameterTree(self._tree, mutable=True)

    def set_error(self, err):
        # Record the error message into the status
        self._error

    def clear_error(self):
        # Clear the error message out of the status dict
        self._error = ""

    def get(self, path, meta):
        """
        Return the ParameterTree value for the supplied path

        :param path: URI path of the GET request
        :param meta: Should the ParameterTree return the meta data associated with the value
        :return: dict object containing the value and meta data if requested
        """
        return self._params.get(path, meta)

    def put(self, path, value):
        self._params.set(path, value)
        self.process_config_changes()

    def update_loop(self):
        """Handle background update loop tasks.
        This method handles background update tasks executed periodically in the tornado
        IOLoop instance. This includes requesting the status from the underlying application
        and preparing the JSON encoded reply in a format that can be easily parsed.
        """
        logging.debug("Starting the status/config update thread...")

        while self._status_running:
            try:
                # Handle background tasks
                # Loop over all connected clients and obtain the status
                for index, client in enumerate(self._clients):
                    try:
                        # First check for stale status within a client (1 seconds)
                        # client.check_for_stale_status(1.0)
                        # Now check for a transition from disconnected to connected
                        if not client.connected():
                            self._client_connections[index] = False
                        else:
                            if not self._client_connections[index]:
                                self._client_connections[index] = True

                    except Exception as e:
                        # Exception caught, log the error but do not stop the update loop
                        logging.error("Unhandled exception: %s", e)

                    # Request parameter updates
                    for parameter_tree in ["status", "request_configuration"]:
                        try:
                            msg = client.send_request(parameter_tree)
                            if client.wait_for_response(msg.get_msg_id()):
                                logging.error(
                                    f"{parameter_tree} request to "
                                    f"{client.ctrl_endpoint} timed out"
                                )
                        except Exception as e:
                            # Log the error, but do not stop the update loop
                            logging.error("Unhandled exception: %s", e)

                    self.handle_client(client, index)
                    if "status" in client.parameters:
                        self._params.replace(
                            f"{index}/status", client.parameters["status"]
                        )
                    if "config" in client.parameters:
                        self._params.replace(
                            f"{index}/config", client.parameters["config"]
                        )

                self._config_cache = [
                    self._params.get(f"{idx}/config")
                    for idx, _ in enumerate(self._clients)
                ]

                self.process_updates()

            except Exception as ex:
                logging.error("{}".format(ex))

            time.sleep(self._update_interval)

    def handle_client(self, client, index):
        """Called on each client in the update_loop loop before updating the
        parameter tree and caching the config, can be overloaded by
        subclasses to implement controller specific logic.
        """
        pass

    def process_config_changes(self):
        """Search through the application config trees and compare with the
        latest cached version.  Any changes should be built into a new config
        message and sent down to the applications.
        This method must be called after the set method is called and needs to
        be executed in its own thread to avoid blocking the tornado loop.
        """
        new_config = [
            self._params.get(f"{idx}/config") for idx, _ in enumerate(self._clients)
        ]
        diff = DeepDiff(self._config_cache, new_config)

        if "values_changed" in diff:
            logging.debug("Config deltas: %s", diff)
            # Build an array of configurations from any differences.
            # There will be 1 configuration object for each client (if differences
            # are present)
            configs = [None] * len(self._clients)
            for root in diff["values_changed"]:
                # Clean up the root string removing start and end constants
                path = (
                    root.replace("root[", "").rstrip("]").replace("'", "").split("][")
                )
                logging.debug("Path: {}".format(path))
                # First element of the path is the index of the client
                # Second element of the path is the key 'config'
                index = int(path[0])
                # Build the config for this root
                if configs[index] is None:
                    configs[index] = {}
                client_cfg = configs[index]
                logging.debug("Client config [%s]: %s", index, client_cfg)
                cfg = client_cfg
                for item in path[2:-1]:
                    if item not in cfg:
                        cfg[item] = {}
                    cfg = cfg[item]
                cfg[path[-1]] = diff["values_changed"][root]["new_value"]

            logging.info("Sending configs: %s", configs)

            # Loop through the new params
            index = 0
            for config in configs:
                if config is not None:
                    self._clients[index].send_configuration(config)
                index += 1

    def create_demand_config(self, new_params, old_params):
        config = None
        for item in new_params:
            logging.debug("Param: {}".format(item))
            logging.debug("   Type: {}".format(type(new_params[item])))
            if item in old_params:
                if isinstance(new_params[item], dict):
                    diff = self.create_demand_config(new_params[item], old_params[item])
                    if diff is not None:
                        if config is None:
                            config = {}
                        config[item] = diff
                elif isinstance(new_params[item], list):
                    if config is None:
                        config = {item: []}
                    for new_item, old_item in zip(new_params[item], old_params[item]):
                        if isinstance(new_item, dict):
                            config[item].append(
                                self.create_demand_config(new_item, old_item)
                            )
                        else:
                            if new_item != old_item:
                                config[item].append(new_item)
                else:
                    if new_params[item] != old_params[item]:
                        if config is None:
                            config = {}
                        config[item] = new_params[item]
        return config

    def process_updates(self):
        """Handle additional background update loop tasks

        Child classes can implement logic here to take any action based on the
        latest parameter tree, before the next update is scheduled.

        """
        pass

    def shutdown(self):
        self._status_running = False

    def _set(self, attr, val):
        logging.debug("_set called: {}  {}".format(attr, val))
        setattr(self, attr, val)

    def _get(self, attr):
        return lambda: getattr(self, attr)