kedder/openvario-shell

View on GitHub
src/ovshell/testing.py

Summary

Maintainability
A
3 hrs
Test Coverage
import asyncio
import os
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any, Callable, Coroutine, Generator, Iterable, Iterator, Optional
from typing import TypeVar

import urwid

from ovshell import api

JT = TypeVar("JT", bound=api.JsonType)


class AppManagerStub(api.AppManager):
    _app_infos: list[api.AppInfo]

    def __init__(self, log: list[str]) -> None:
        self._log = log
        self._app_infos = []

    def list_apps(self) -> Iterable[api.AppInfo]:
        return self._app_infos

    def get(self, appid: str) -> Optional[api.AppInfo]:
        appbyid = {ai.id: ai for ai in self._app_infos}
        return appbyid.get(appid)

    def pin(self, app: api.AppInfo, persist: bool = False) -> None:
        pass

    def unpin(self, app: api.AppInfo, persist: bool = False) -> None:
        pass

    def stub_add_app(
        self, id: str, app: api.App, ext: api.Extension, pinned: bool = False
    ) -> None:
        self._app_infos.append(api.AppInfo(id, app, ext, pinned))


class ExtensionManagerStub(api.ExtensionManager):
    def __init__(self, log: list[str]) -> None:
        self._log = log

    def list_extensions(self) -> Iterable[api.Extension]:
        return []


class DialogStub(api.Dialog):
    def __init__(self, title: str, content: urwid.Widget):
        self.title = title
        self.content = content
        self.buttons: dict[str, Callable[[], bool]] = {}

    def add_button(self, label: str, handler: Callable[[], bool]) -> None:
        self.buttons[label] = handler

    def no_buttons(self) -> None:
        self.buttons = {}

    def stub_press_button(self, label: str):
        return self.buttons[label]()


@dataclass
class TopIndicatorStub:
    id: str
    markup: api.UrwidText
    location: api.IndicatorLocation
    weight: int


class ScreenManagerStub(api.ScreenManager):
    _log: list[str]
    _activities: list[api.Activity]
    _tasks: list[tuple[api.Activity, asyncio.Task]]
    _dialog: Optional[DialogStub]
    _indicators: dict[str, TopIndicatorStub]
    _status: Optional[api.UrwidText]

    def __init__(self, log: list[str]) -> None:
        self._log = log
        self._activities = []
        self._tasks = []
        self._dialog = None
        self._indicators = {}
        self._status = None

    def draw(self) -> None:
        self._log.append("Screen redrawn")

    @contextmanager
    def suspended(self) -> Iterator[None]:
        yield

    def push_activity(
        self, activity: api.Activity, palette: Optional[list[tuple]] = None
    ) -> None:
        self._activities.append(activity)

    def pop_activity(self) -> None:
        if self._dialog is not None:
            # If dialog is open, close it
            self._dialog = None
            return

        self._activities.pop()

    def push_modal(self, activity: api.Activity, options: api.ModalOptions) -> None:
        self._activities.append(activity)

    def push_dialog(self, title: str, content: urwid.Widget) -> api.Dialog:
        self._dialog = DialogStub(title, content)
        return self._dialog

    def set_indicator(
        self,
        iid: str,
        markup: api.UrwidText,
        location: api.IndicatorLocation,
        weight: int,
    ) -> None:
        self._indicators[iid] = TopIndicatorStub(iid, markup, location, weight)

    def remove_indicator(self, iid: str) -> None:
        if iid in self._indicators:
            del self._indicators[iid]

    def spawn_task(self, activity: api.Activity, coro: Coroutine) -> asyncio.Task:
        task = asyncio.create_task(coro)
        self._tasks.append((activity, task))
        task.add_done_callback(self._task_done)
        return task

    def set_status(self, text: api.UrwidText) -> None:
        self._status = text

    def _task_done(self, task: asyncio.Task) -> None:
        if task.cancelled():
            return
        exc = task.exception()
        if exc is not None:
            raise exc

    def stub_top_activity(self) -> Optional[api.Activity]:
        if not self._activities:
            return None
        return self._activities[-1]

    def stub_dialog(self) -> Optional[DialogStub]:
        return self._dialog

    def stub_cancel_tasks(self) -> None:
        for act, task in self._tasks:
            task.cancel()

    async def stub_wait_for_tasks(self, act: api.Activity) -> None:
        acttasks = [t for a, t in self._tasks if a is act]
        await asyncio.wait(acttasks)

    def stub_get_indicator(self, iid: str) -> Optional[TopIndicatorStub]:
        return self._indicators.get(iid)

    def stub_list_indicators(self) -> list[TopIndicatorStub]:
        return list(self._indicators.values())

    def stub_get_status(self) -> Optional[api.UrwidText]:
        return self._status


class StoredSettingsStub(api.StoredSettings):
    _settings: dict[str, Optional[api.JsonType]]

    def __init__(self, log: list[str]) -> None:
        self._log = log
        self._settings = {}

    def setdefault(self, key: str, value: api.JsonType) -> None:
        self._settings.setdefault(key, value)

    def set(self, key: str, value: Optional[api.JsonType], save: bool = False):
        self._settings[key] = value

    def get(
        self, key: str, type: type[JT], default: Optional[JT] = None
    ) -> Optional[JT]:
        v = self._settings.get(key, default)
        return v if isinstance(v, type) else None

    def getstrict(self, key: str, type: type[JT]) -> JT:
        v = self.get(key, type)
        assert v is not None
        return v

    def save(self) -> None:
        pass


class OSPRocessStub(api.OSProcess):
    def __init__(
        self, returncode: int = 0, stdout: bytes = b"", stderr: bytes = b""
    ) -> None:
        self._returncode = returncode
        self.stdout = asyncio.streams.StreamReader()
        self.stdout.feed_data(stdout)
        self.stdout.feed_eof()
        self.stderr = asyncio.streams.StreamReader()
        self.stderr.feed_data(stderr)
        self.stderr.feed_eof()

    async def wait(self) -> int:
        await asyncio.sleep(0)
        return self._returncode


class MessageBusIntrospectionStub(api.AbstractMessageBusIntrospection):
    def __init__(self, bus_name: str, path: str) -> None:
        self.bus_name = bus_name
        self.path = path


class MessageBusProxyObjectStub(api.AbstractMessageBusProxyObject):
    def __init__(
        self,
        bus_name: str,
        path: str,
        introspection: api.AbstractMessageBusIntrospection,
        impls: dict[str, Any],
    ) -> None:
        self.__bus_name = bus_name
        self.__path = path
        self.__introspection = introspection
        self.__impls = impls

    def get_interface(self, name: str):
        return self.__impls[name]


class MessageBusStub(api.AbstractMessageBus):
    _impls: dict[str, dict[str, Any]]
    _exported: dict[str, Any]

    def __init__(self) -> None:
        self._impls = {}
        self._exported = {}

    async def introspect(
        self, bus_name: str, path: str, timeout: float = 30.0
    ) -> MessageBusIntrospectionStub:
        return MessageBusIntrospectionStub(bus_name, path)

    def get_proxy_object(
        self,
        bus_name: str,
        path: str,
        introspection: api.AbstractMessageBusIntrospection,
    ) -> MessageBusProxyObjectStub:
        return MessageBusProxyObjectStub(
            bus_name, path, introspection, self._impls.get(path, {})
        )

    def export(self, path: str, agent: Any) -> None:
        self._exported[path] = agent

    def stub_register_interface(self, path: str, iface_name: str, impl: Any) -> None:
        ifaces = self._impls.setdefault(path, {})
        ifaces[iface_name] = impl

    def stub_get_exported(self) -> dict[str, Any]:
        return self._exported


class OpenVarioOSStub(api.OpenVarioOS):
    _stub_run_returncode: int = 0
    _stub_run_stdout: bytes = b""
    _stub_run_stderr: bytes = b""
    _stub_bus: Optional[MessageBusStub] = None
    _stub_bus_connected: "asyncio.Future[MessageBusStub]"

    def __init__(self, log: list[str], rootfs: str) -> None:
        self._log = log
        self._rootfs = rootfs
        self._stub_bus_connected = asyncio.Future()

    def path(self, path: str) -> str:
        assert path.startswith("/"), "Absolute path is required"
        if not path.startswith("//"):
            return path

        return os.path.join(self._rootfs, path[2:])

    async def run(self, command: str, args: list[str]) -> api.OSProcess:
        self._log.append(f"OS: Running {command} {' '.join(args)}")
        return OSPRocessStub(
            self._stub_run_returncode, self._stub_run_stdout, self._stub_run_stderr
        )

    def sync(self) -> None:
        self._log.append("OS: Sync")

    def shut_down(self) -> None:
        self._log.append("OS: Shut down")

    def restart(self) -> None:
        self._log.append("OS: Restart")

    def spawn_shell(self) -> None:
        self._log.append("OS: Run shell")

    def stub_expect_run(
        self, result: int = 0, stdout: bytes = b"", stderr: bytes = b""
    ) -> None:
        self._stub_run_returncode = result
        self._stub_run_stdout = stdout
        self._stub_run_stderr = stderr

    async def get_system_bus(self) -> api.AbstractMessageBus:
        return await self._stub_bus_connected

    def stub_connect_bus(self) -> MessageBusStub:
        self._stub_bus = MessageBusStub()
        self._stub_bus_connected.set_result(self._stub_bus)
        return self._stub_bus

    def stub_fail_bus(self) -> None:
        self._stub_bus_connected.set_exception(
            api.DBusNotAvailableException("Stub connection failed")
        )


class NMEAStreamStub(api.NMEAStream):
    def __init__(self, nmeas: list[api.NMEA]) -> None:
        self._nmeas = list(reversed(nmeas))

    async def read(self) -> api.NMEA:
        return self._nmeas.pop()

    def __aiter__(self):
        return self

    async def __anext__(self):
        return await self.read()


class DeviceManagerStub(api.DeviceManager):
    _devices: list[api.Device]
    _nmeas: list[api.NMEA]

    def __init__(self, log: list[str]) -> None:
        self._log = log
        self._devices = list()
        self._nmeas = list()

    def register(self, device: api.Device) -> None:
        self._devices.append(device)
        self._log.append(f"Registered device {device.id}")

    def enumerate(self) -> list[api.Device]:
        return self._devices

    @contextmanager
    def open_nmea(self) -> Generator[api.NMEAStream, None, None]:
        yield NMEAStreamStub(self._nmeas)

    def stub_add_nmea(self, nmeas: list[api.NMEA]) -> None:
        self._nmeas.extend(nmeas)

    def stub_remove_device(self, devid: str) -> None:
        self._devices = [dev for dev in self._devices if dev.id != devid]


class ProcessManagerStub(api.ProcessManager):
    def __init__(self, log: list[str]) -> None:
        self._log = log

    def start(self, coro: Coroutine) -> asyncio.Task:
        return asyncio.create_task(coro)


class OpenVarioShellStub(api.OpenVarioShell):
    _log: list[str]

    def __init__(self, fsroot: str) -> None:
        self._log = []
        self.apps = AppManagerStub(self._log)
        self.extensions = ExtensionManagerStub(self._log)
        self.screen = ScreenManagerStub(self._log)
        self.settings = StoredSettingsStub(self._log)
        self.os = OpenVarioOSStub(self._log, fsroot)
        self.devices = DeviceManagerStub(self._log)
        self.processes = ProcessManagerStub(self._log)

        self._fsroot = fsroot

    def get_stub_log(self) -> list[str]:
        return self._log

    def stub_teardown(self) -> None:
        self.screen.stub_cancel_tasks()