LUXROBO/pymodi

View on GitHub
modi/modi.py

Summary

Maintainability
B
4 hrs
Test Coverage
"""Main MODI module."""

import sys
import time
import atexit
import logging

from importlib import import_module as im

from modi._exe_thrd import ExeThrd
from modi.util.connection_util import is_network_module_connected, is_on_pi
from modi.util.miscellaneous_util import ModuleList
from modi.util.strange_util import check_complete
from modi.util.topology_util import TopologyManager
from modi.util.firmware_updater import STM32FirmwareUpdater
from modi.util.firmware_updater import ESP32FirmwareUpdater

from modi.about import __version__


class MODI:
    network_uuids = {}

    def __call__(cls, *args, **kwargs):
        network_uuid = kwargs.get('network_uuid')
        conn_type = kwargs.get('conn_type')
        if conn_type != 'ble':
            return super(MODI, cls).__call__(*args, **kwargs)
        if not network_uuid:
            raise ValueError('Should input a valid network uuid!')
        if network_uuid not in cls.network_uuids:
            cls.network_uuids[network_uuid] = \
                super(MODI, cls).__call__(*args, **kwargs)
        return cls.network_uuids[network_uuid]

    def __init__(
        self, modi_version=1, conn_type="", verbose=False, port=None,
        network_uuid="", virtual_modules=None,
    ):
        if virtual_modules and conn_type != "vir":
            raise ValueError(
                "Virtual modules can only be defined in virtual connection"
            )
        self._modules = list()
        self._topology_data = dict()

        self._conn = self.__init_task(
            conn_type, verbose, port, network_uuid,
        )

        self._exe_thrd = ExeThrd(
            self._modules, self._topology_data, self._conn
        )
        print('Start initializing connected MODI modules')
        self._exe_thrd.start()

        self._topology_manager = TopologyManager(
            self._topology_data, self._modules
        )

        init_time = time.time()
        while not self._topology_manager.is_topology_complete():
            time.sleep(0.1)
            if time.time() - init_time > 3:
                print(
                    'MODI init timeout over. Check your module connection.'
                )
                break
        check_complete(self)
        print("MODI modules are initialized!")

        bad_modules = (
            self.__wait_user_code_check() if conn_type != 'ble' else []
        )
        if bad_modules:
            cmd = input(f"{[str(module) for module in bad_modules]} "
                        f"has user code in it.\n"
                        f"Reset the user code? [y/n] ")
            if 'y' in cmd:
                self.close()
                modules_to_reset = filter(
                    lambda m: m.is_up_to_date, bad_modules
                )
                modules_to_update = filter(
                    lambda m: not m.is_up_to_date, bad_modules
                )
                reset_module_firmware(
                    tuple(module.id for module in modules_to_reset)
                )
                update_module_firmware(
                    tuple(module.id for module in modules_to_update)
                )
                self.open()
        atexit.register(self.close)

    @staticmethod
    def __init_logger():
        logger = logging.getLogger(f'PyMODI (v{__version__}) Logger')
        logger.setLevel(logging.DEBUG)

        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        file_handler = logging.FileHandler('pymodi.log')
        file_handler.setLevel(logging.DEBUG)
        file_handler.setFormatter(formatter)

        logger.addHandler(file_handler)
        return logger

    def __wait_user_code_check(self):
        def is_not_checked(module):
            return module.user_code_status < 0

        while list(filter(is_not_checked, self._modules)):
            time.sleep(0.1)
        bad_modules = []
        for module in self._modules:
            if module.has_user_code:
                bad_modules.append(module)
        return bad_modules

    def __init_task(
        self, conn_type, verbose, port, network_uuid,
    ):
        if not conn_type:
            is_can = not is_network_module_connected() and is_on_pi()
            conn_type = 'can' if is_can else 'ser'

        if conn_type == 'ser':
            return im('modi.task.ser_task').SerTask(verbose, port)
        elif conn_type == 'soc':
            return im('modi.task.soc_task').SocTask(verbose, port)
        elif conn_type == 'vir':
            return im('modi.task.vir_task').VirTask(verbose, port)
        elif conn_type == 'can':
            return im('modi.task.can_task').CanTask(verbose)
        elif conn_type == 'ble':
            if not network_uuid:
                raise ValueError('Network UUID not specified!')
            self.network_uuids[network_uuid] = self
            mod_path = {
                'win32': 'modi.task.ble_task.ble_task_mac',
                'linux': 'modi.task.ble_task.ble_task_rpi',
                'darwin': 'modi.task.ble_task.ble_task_mac',
            }.get(sys.platform)
            return im(mod_path).BleTask(verbose, network_uuid)
        else:
            raise ValueError(f'Invalid conn mode: {conn_type}')

    def open(self):
        atexit.register(self.close)
        self._exe_thrd = ExeThrd(
            self._modules, self._topology_data, self._conn
        )
        self._conn.open_conn()
        self._exe_thrd.start()

    def close(self):
        atexit.unregister(self.close)
        print("Closing MODI connection...")
        self._exe_thrd.close()
        self._conn.close_conn()

    def send(self, message):
        """Low level method to send json pkt directly to modules

        :param message: Json packet to send
        :return: None
        """
        self._conn.send_nowait(message)

    def recv(self):
        """Low level method to receive json pkt directly from modules

        :return: Json msg received
        :rtype: str if msg exists, else None
        """
        return self._conn.recv()

    def print_topology_map(self, print_id=False):
        """Prints out the topology map

        :param print_id: if True, the result includes module id
        :return: None
        """
        self._topology_manager.print_topology_map(print_id)

    @property
    def modules(self) -> ModuleList:
        """Module List of connected modules except network module.
        """
        return ModuleList(self._modules)

    @property
    def networks(self) -> ModuleList:
        return ModuleList(self._modules, 'network')

    @property
    def buttons(self) -> ModuleList:
        """Module List of connected Button modules.
        """
        return ModuleList(self._modules, 'button')

    @property
    def dials(self) -> ModuleList:
        """Module List of connected Dial modules.
        """
        return ModuleList(self._modules, "dial")

    @property
    def displays(self) -> ModuleList:
        """Module List of connected Display modules.
        """
        return ModuleList(self._modules, "display")

    @property
    def envs(self) -> ModuleList:
        """Module List of connected Env modules.
        """
        return ModuleList(self._modules, "env")

    @property
    def gyros(self) -> ModuleList:
        """Module List of connected Gyro modules.
        """
        return ModuleList(self._modules, "gyro")

    @property
    def irs(self) -> ModuleList:
        """Module List of connected Ir modules.
        """
        return ModuleList(self._modules, "ir")

    @property
    def leds(self) -> ModuleList:
        """Module List of connected Led modules.
        """
        return ModuleList(self._modules, "led")

    @property
    def mics(self) -> ModuleList:
        """Module List of connected Mic modules.
        """
        return ModuleList(self._modules, "mic")

    @property
    def motors(self) -> ModuleList:
        """Module List of connected Motor modules.
        """
        return ModuleList(self._modules, "motor")

    @property
    def speakers(self) -> ModuleList:
        """Module List of connected Speaker modules.
        """
        return ModuleList(self._modules, "speaker")

    @property
    def ultrasonics(self) -> ModuleList:
        """Module List of connected Ultrasonic modules.
        """
        return ModuleList(self._modules, "ultrasonic")


def update_module_firmware(target_ids=(0xFFF, )):
    updater = STM32FirmwareUpdater(target_ids=target_ids)
    updater.update_module_firmware()
    updater.close()


def reset_module_firmware(target_ids=(0xFFF, )):
    updater = STM32FirmwareUpdater(is_os_update=False, target_ids=target_ids)
    updater.update_module_firmware()
    updater.close()


def update_network_firmware(force=False):
    updater = ESP32FirmwareUpdater()
    updater.update_firmware(force=force)