kedder/openvario-shell

View on GitHub
src/ovshell_connman/manager.py

Summary

Maintainability
A
1 hr
Test Coverage
import asyncio
import types
import weakref
from typing import Any, Callable, Optional, Sequence, cast

from dbus_next import Variant
from dbus_next.errors import DBusError, InterfaceNotFoundError

from ovshell.api import AbstractMessageBus

from . import model
from .api import ConnmanManager, ConnmanNotAvailableException, ConnmanService
from .api import ConnmanState, ConnmanTechnology
from .dbusiface import ConnmanManagerProxyInterface, ConnmanServiceProxyInterface
from .dbusiface import ConnmanTechnologyProxyInterface


class ConnmanServiceProxy:
    _iface: Optional[ConnmanServiceProxyInterface]
    _change_handlers: list[weakref.WeakMethod]

    def __init__(self, svc: ConnmanService, bus: AbstractMessageBus) -> None:
        self.service = svc
        self._bus = bus
        self._iface = None
        self._change_handlers = []
        self._tracking = False
        self._tracking_lock = asyncio.Lock()

    async def connect(self) -> None:
        return await (await self._get_service_iface()).call_connect()

    async def remove(self) -> None:
        return await (await self._get_service_iface()).call_remove()

    async def disconnect(self) -> None:
        return await (await self._get_service_iface()).call_disconnect()

    async def start_tracking(self) -> None:
        async with self._tracking_lock:
            if self._tracking:
                return

            try:
                iface = await self._get_service_iface()
            except InterfaceNotFoundError:
                return
            iface.on_property_changed(self._on_property_changed)
            self._tracking = True

    def stop_tracking(self) -> None:
        if self._iface is None:
            return
        self._iface.off_property_changed(self._on_property_changed)

    def is_tracking(self) -> bool:
        return self._tracking

    def on_change(self, handler: Callable[[ConnmanService], None]) -> None:
        assert isinstance(handler, types.MethodType)
        self._change_handlers.append(weakref.WeakMethod(handler))

    def off_change(self, handler: Callable[[ConnmanService], None]) -> None:
        self._change_handlers = [wh for wh in self._change_handlers if wh() == handler]

    def _fire_changed(self) -> None:
        for wh in self._change_handlers:
            h = wh()
            if h is not None:
                h(self.service)

    def _on_property_changed(self, name: str, value: Variant) -> None:
        model.update_service_from_props(self.service, {name: value})
        self._fire_changed()

    async def _get_service_iface(self) -> ConnmanServiceProxyInterface:
        if self._iface is not None:
            return self._iface

        introspection = await self._bus.introspect("net.connman", self.service.path)
        proxy = self._bus.get_proxy_object(
            "net.connman", self.service.path, introspection
        )
        iface = proxy.get_interface("net.connman.Service")
        self._iface = cast(ConnmanServiceProxyInterface, iface)
        return self._iface


class ConnmanTechnologyProxy:
    def __init__(self, tech: ConnmanTechnology, bus: AbstractMessageBus) -> None:
        self._bus = bus
        self._tech = tech

    async def set_property(self, prop: str, value: Variant) -> None:
        iface = await self._get_tech_iface()
        return await iface.call_set_property(prop, value)

    async def scan(self) -> None:
        iface = await self._get_tech_iface()
        return await iface.call_scan()

    async def _get_tech_iface(self) -> ConnmanTechnologyProxyInterface:
        introspection = await self._bus.introspect("net.connman", self._tech.path)
        proxy = self._bus.get_proxy_object(
            "net.connman", self._tech.path, introspection
        )
        iface = proxy.get_interface("net.connman.Technology")
        return cast(ConnmanTechnologyProxyInterface, iface)


class ConnmanManagerImpl(ConnmanManager):
    technologies: list[ConnmanTechnology]
    _manager_props: dict[str, Variant]

    _tech_change_handlers: list[weakref.WeakMethod]
    _svc_change_handlers: list[weakref.WeakMethod]

    _svc_proxies: dict[str, ConnmanServiceProxy]
    _svc_order: list[str]

    def __init__(self, bus: AbstractMessageBus) -> None:
        self._bus = bus
        self.technologies = []
        self._manager_props = {}
        self._tech_change_handlers = []
        self._svc_change_handlers = []

        self._svc_proxies = {}
        self._svc_order = []

    async def setup(self) -> None:
        try:
            introspection = await self._bus.introspect("net.connman", "/")
            proxy = self._bus.get_proxy_object("net.connman", "/", introspection)
            self._manager_iface = cast(
                ConnmanManagerProxyInterface, proxy.get_interface("net.connman.Manager")
            )
        except DBusError as e:
            raise ConnmanNotAvailableException() from e

        self._subscribe_events(self._manager_iface)
        self._manager_props = await self._fetch_properties(self._manager_iface)
        await self._refresh_technologies()
        await self._refresh_services()

    def teardown(self) -> None:
        self._unsubscribe_events(self._manager_iface)
        for svcp in self._svc_proxies.values():
            svcp.stop_tracking()

    def list_services(self) -> Sequence[model.ConnmanService]:
        svcs = []
        for path in self._svc_order:
            sp = self._svc_proxies[path]
            svcs.append(sp.service)
        return svcs

    def on_service_property_changed(
        self, service: ConnmanService, handler: Callable[[model.ConnmanService], None]
    ) -> None:
        svcp = self._svc_proxies[service.path]
        svcp.on_change(handler)

    def off_service_property_changed(
        self, service: ConnmanService, handler: Callable[[ConnmanService], None]
    ) -> None:
        svcp = self._svc_proxies[service.path]
        svcp.off_change(handler)

    async def connect(self, service: ConnmanService) -> None:
        svcp = self._svc_proxies[service.path]
        await svcp.connect()
        await self._refresh_services()

    async def remove(self, service: ConnmanService) -> None:
        svcp = self._svc_proxies[service.path]
        await svcp.remove()
        await self._refresh_services()

    async def disconnect(self, service: ConnmanService) -> None:
        svcp = self._svc_proxies[service.path]
        await svcp.disconnect()
        await self._refresh_services()

    async def power(self, tech: ConnmanTechnology, on: bool) -> None:
        proxy = ConnmanTechnologyProxy(tech, self._bus)
        await proxy.set_property("Powered", Variant("b", on))
        await self._refresh_technologies()

    async def scan_all(self) -> int:
        ifaces = []
        for tech in self.technologies:
            if not tech.powered:
                continue
            if tech.type != "wifi":
                continue
            ifaces.append(ConnmanTechnologyProxy(tech, self._bus))
        if not ifaces:
            return 0

        scantasks = [asyncio.create_task(iface.scan()) for iface in ifaces]
        (done, pending) = await asyncio.wait(
            scantasks, return_when=asyncio.ALL_COMPLETED
        )
        return len([res.result for res in done])

    def get_state(self) -> ConnmanState:
        if "State" not in self._manager_props:
            return ConnmanState.UNKNOWN
        state = self._manager_props["State"]
        return ConnmanState(state.value)

    def on_technologies_changed(self, handler: Callable[[], None]) -> None:
        assert isinstance(handler, types.MethodType)
        self._tech_change_handlers.append(weakref.WeakMethod(handler))

    def on_services_changed(self, handler: Callable[[], None]) -> None:
        assert isinstance(handler, types.MethodType)
        self._svc_change_handlers.append(weakref.WeakMethod(handler))

    def _subscribe_events(self, iface: ConnmanManagerProxyInterface):
        iface.on_property_changed(self._notify_property_changed)
        iface.on_services_changed(self._notify_service_changed)
        iface.on_technology_added(self._notify_tech_added)
        iface.on_technology_removed(self._notify_tech_removed)

    def _unsubscribe_events(self, iface: ConnmanManagerProxyInterface):
        iface.off_property_changed(self._notify_property_changed)
        iface.off_services_changed(self._notify_service_changed)
        iface.off_technology_added(self._notify_tech_added)
        iface.off_technology_removed(self._notify_tech_removed)

    async def _refresh_technologies(self) -> None:
        self.technologies = await self._fetch_technologies(self._manager_iface)
        self._fire_tech_changed()

    async def _fetch_technologies(
        self, iface: ConnmanManagerProxyInterface
    ) -> list[ConnmanTechnology]:
        techs = await iface.call_get_technologies()
        res = []
        for path, tech in techs:
            res.append(model.create_technology_from_props(path, tech))

        return res

    async def _fetch_properties(
        self, iface: ConnmanManagerProxyInterface
    ) -> dict[str, Variant]:
        return await iface.call_get_properties()

    async def _refresh_services(self) -> None:
        svcs = await self._manager_iface.call_get_services()
        self._notify_service_changed(svcs, [])

    def _notify_property_changed(self, name: str, value: Variant) -> None:
        self._manager_props[name] = value

    def _notify_service_changed(
        self, changed: list[tuple[str, dict[str, Variant]]], removed: list[str]
    ) -> None:
        totrack = []
        # Update props
        for path, dbusprops in changed:
            svcp = self._svc_proxies.get(path)
            if svcp is None:
                if "Name" not in dbusprops or "Type" not in dbusprops:
                    # This is an update message, we cannot create a service
                    continue
                svc = model.create_service_from_props(path, dbusprops)
                svcp = ConnmanServiceProxy(svc, self._bus)
                self._svc_proxies[path] = svcp
            else:
                svc = svcp.service
                model.update_service_from_props(svc, dbusprops)
            if not svcp.is_tracking():
                totrack.append(path)

        self._svc_order = [path for path, _ in changed if path in self._svc_proxies]
        if totrack:
            asyncio.create_task(self._start_tracking_services(totrack))
        self._fire_svc_changed()

    async def _start_tracking_services(self, paths: list[str]) -> None:
        for path in paths:
            svcp = self._svc_proxies[path]
            await svcp.start_tracking()

    def _notify_tech_added(self, path: str, props: dict[str, Any]) -> None:
        tech = model.create_technology_from_props(path, props)
        self.technologies.append(tech)
        self._fire_tech_changed()

    def _notify_tech_removed(self, path: str) -> None:
        self.technologies = [t for t in self.technologies if t.path != path]
        self._fire_tech_changed()

    def _fire_tech_changed(self) -> None:
        for wh in self._tech_change_handlers:
            h = wh()
            if h is not None:
                h()

    def _fire_svc_changed(self) -> None:
        for wh in self._svc_change_handlers:
            h = wh()
            if h is not None:
                h()