2Fake/devolo_plc_api

View on GitHub
devolo_plc_api/clients/protobuf.py

Summary

Maintainability
A
0 mins
Test Coverage
"""Google Protobuf client."""
from __future__ import annotations

import asyncio
import logging
from abc import ABC, abstractmethod
from hashlib import sha256
from http import HTTPStatus
from typing import Any, Callable

from httpx import (
    AsyncClient,
    ConnectError,
    ConnectTimeout,
    DigestAuth,
    HTTPStatusError,
    ReadTimeout,
    RemoteProtocolError,
    Response,
)

from devolo_plc_api.exceptions import DevicePasswordProtected, DeviceUnavailable

TIMEOUT = 10.0


class Protobuf(ABC):
    """Google Protobuf client as ground work."""

    @abstractmethod
    def __init__(self) -> None:
        """Initialize the client."""
        self._logger = logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}")

        self.password: str

        self._ip: str
        self._path: str
        self._port: int
        self._session: AsyncClient
        self._user: str
        self._version: str

    def __getattr__(self, attr: str) -> Callable[..., Any]:
        """Catch attempts to call methods synchronously."""

        def method(*args: Any, **kwargs: Any) -> Any:
            return asyncio.run(getattr(self, async_method)(*args, **kwargs))

        async_method = f"async_{attr}"
        if hasattr(self.__class__, async_method):
            return method
        raise AttributeError(f"{self.__class__.__name__} object has no attribute {attr}")  # noqa: EM102, TRY003

    @property
    def url(self) -> str:
        """The base URL to query."""
        return f"http://{self._ip}:{self._port}/{self._path}/{self._version}/"

    async def _async_get(self, sub_url: str, timeout: float = TIMEOUT) -> Response:
        """Query URL asynchronously."""
        url = f"{self.url}{sub_url}"
        self._logger.debug("Getting from %s", url)
        return await self._async_request("GET", url, None, timeout)

    async def _async_post(self, sub_url: str, content: bytes, timeout: float = TIMEOUT) -> Response:
        """Post data asynchronously."""
        url = f"{self.url}{sub_url}"
        self._logger.debug("Posting to %s", url)
        return await self._async_request("POST", url, content, timeout)

    async def _async_request(self, method: str, url: str, content: bytes | None, timeout: float = TIMEOUT) -> Response:
        """Request data asynchronously."""
        try:
            response = await self._session.request(
                method,
                url,
                auth=DigestAuth(self._user, self.password),
                content=content,
                timeout=timeout,
            )
            if response.status_code == HTTPStatus.UNAUTHORIZED:
                self.password = sha256(self.password.encode("utf-8")).hexdigest()
                response = await self._session.request(
                    method,
                    url,
                    auth=DigestAuth(self._user, self.password),
                    content=content,
                    timeout=timeout,
                )
            response.raise_for_status()
        except HTTPStatusError as e:
            if e.response.status_code == HTTPStatus.UNAUTHORIZED:
                raise DevicePasswordProtected from None
            raise
        except (ConnectTimeout, ConnectError, ReadTimeout, RemoteProtocolError):
            raise DeviceUnavailable from None
        else:
            return response