endremborza/atqo

View on GitHub
atqo/bases.py

Summary

Maintainability
A
0 mins
Test Coverage
from abc import ABC, abstractmethod
from asyncio import Future
from functools import cached_property
from typing import TYPE_CHECKING

from structlog import get_logger

if TYPE_CHECKING:
    from .core import SchedulerTask  # pragma: no cover


class TaskPropertyBase:  # pragma: no cover
    # TODO
    def __repr__(self):
        param_str = ", ".join(f"{k}={v}" for k, v in self.__dict__.items())
        return f"{type(self).__name__}({param_str})"

    def __hash__(self):
        return id(self).__hash__()

    def __eq__(self, other):
        return self.__hash__() == other.__hash__()


class ActorBase(ABC):
    @abstractmethod
    def consume(self, task_arg):
        pass  # pragma: no cover

    def stop(self):
        """if any cleanup needed"""
        pass  # pragma: no cover

    @cached_property
    def _log(self):
        return get_logger(actor=type(self).__name__).info


class DistAPIBase(ABC):
    @property
    def exception(self):
        return Exception

    def join(self):
        """wait on all running tasks"""
        pass

    @staticmethod
    def kill(actor: ActorBase):
        actor.stop()

    @staticmethod
    def get_running_actor(actor_creator) -> ActorBase:
        return actor_creator()

    @staticmethod
    def get_future(actor: ActorBase, next_task: "SchedulerTask") -> Future:
        f = Future()
        f.set_result(actor.consume(next_task.argument))
        return f

    @staticmethod
    def parse_exception(e):
        return e