AngellusMortis/sxm-player

View on GitHub
sxm_player/workers/base.py

Summary

Maintainability
A
0 mins
Test Coverage
import logging
import time
from datetime import datetime, timedelta
from multiprocessing import synchronize
from typing import List, Optional, Tuple

from ..models import PlayerState
from ..queue import EventMessage, EventTypes, Queue
from ..signals import default_signal_handler, init_signals, interupt_signal_handler

__all__ = [
    "BaseWorker",
    "InterruptableWorker",
    "LoopedWorker",
    "SXMStatusSubscriber",
    "HLSStatusSubscriber",
    "SXMLoopedWorker",
]


class BaseWorker:
    NAME = "worker"

    _log: logging.Logger

    name: str = NAME
    int_handler: staticmethod = staticmethod(default_signal_handler)
    term_handler: staticmethod = staticmethod(default_signal_handler)
    startup_event: synchronize.Event
    shutdown_event: synchronize.Event
    local_shutdown_event: synchronize.Event

    def __init__(
        self,
        startup_event: synchronize.Event,
        shutdown_event: synchronize.Event,
        local_shutdown_event: synchronize.Event,
        event_queue: Queue,
        name: str = "worker",
        *args,
        **kwargs,
    ):
        self._log = logging.getLogger(f"sxm_player.{name}")

        self.name = name

        self.startup_event = startup_event
        self.shutdown_event = shutdown_event
        self.local_shutdown_event = local_shutdown_event
        self.event_queue = event_queue

    def init_signals(self):
        self._log.debug("Entering init_signals")
        signal_object = init_signals(
            self.shutdown_event, self.int_handler, self.term_handler
        )
        return signal_object

    def start(self):
        self.init_signals()

        self.startup_event.set()
        return self.run()

    def run(self):
        raise NotImplementedError("run method not implemented")

    def push_event(self, event: EventMessage):
        success = self.event_queue.safe_put(event)

        if not success:
            self._log.error(f"Could not pass event: {event.msg_src}, {event.msg_type}")


class InterruptableWorker(BaseWorker):
    int_handler: staticmethod = staticmethod(interupt_signal_handler)
    term_handler: staticmethod = staticmethod(interupt_signal_handler)


class LoopedWorker(BaseWorker):
    _delay: float = 1

    def run(self):
        self.setup()

        while not self.shutdown_event.is_set():
            time.sleep(self._delay)
            self.loop()

        self.cleanup()

    def cleanup(self):
        pass

    def setup(self):
        pass

    def loop(self):
        raise NotImplementedError("loop method not implemented")


class SXMStatusSubscriber:
    sxm_status_queue: Queue

    def __init__(self, sxm_status_queue):
        self.sxm_status_queue = sxm_status_queue


class HLSStatusSubscriber:
    hls_stream_queue: Queue

    def __init__(self, hls_stream_queue):
        self.hls_stream_queue = hls_stream_queue


class EventedWorker(LoopedWorker):
    _last_loop: float = 0
    _event_queues: List[Queue]
    _event_delay: float = 0

    def run(self):
        self.setup()

        try:
            while (
                not self.shutdown_event.is_set()
                and not self.local_shutdown_event.is_set()
            ):
                for queue in self._event_queues:
                    event = queue.safe_get()

                    if event:
                        self._log.debug(
                            f"Received event: {event.msg_src}, "
                            f"{event.msg_type.name}"
                        )
                        self._handle_event(event)

                if time.monotonic() > (self._last_loop + self._delay):
                    self.loop()
                    self._last_loop = time.monotonic()
        except Exception as e:
            self._log.error(f"Exception occurred in {self.name}: {e}")

        self.cleanup()

    def _handle_event(self, event: EventMessage):
        raise NotImplementedError("_handle_event method not implemented")


class SXMLoopedWorker(EventedWorker, SXMStatusSubscriber):
    _state: PlayerState

    def __init__(self, sxm_status: bool, *args, **kwargs):
        sxm_status_queue = kwargs.pop("sxm_status_queue")
        SXMStatusSubscriber.__init__(self, sxm_status_queue)
        super().__init__(*args, **kwargs)

        self._state = PlayerState()
        self._state.sxm_running = sxm_status
        self._event_queues = [self.sxm_status_queue]

    def _handle_event(self, event: EventMessage):
        if event.msg_type == EventTypes.SXM_STATUS:
            self._state.sxm_running = event.msg
        else:
            self._log.warning(
                f"Unknown event received: {event.msg_src}, {event.msg_type}"
            )


class HLSLoopedWorker(EventedWorker, HLSStatusSubscriber):
    _state: PlayerState

    def __init__(
        self,
        stream_data: Tuple[Optional[str], Optional[str]] = (None, None),
        channels: Optional[List[dict]] = None,
        raw_live_data: Tuple[
            Optional[datetime], Optional[timedelta], Optional[dict]
        ] = (
            None,
            None,
            None,
        ),
        *args,
        **kwargs,
    ):
        hls_stream_queue = kwargs.pop("hls_stream_queue")
        HLSStatusSubscriber.__init__(self, hls_stream_queue)
        super().__init__(*args, **kwargs)

        self._event_queues = [self.hls_stream_queue]

        self._state = PlayerState()
        self._state.update_stream_data(stream_data)
        self._state.update_channels(channels)
        self._state.set_raw_live(raw_live_data)

    def _handle_event(self, event: EventMessage):
        if event.msg_type == EventTypes.HLS_STREAM_STARTED:
            self._state.update_stream_data(event.msg)
        elif event.msg_type == EventTypes.UPDATE_METADATA:
            self._state.set_raw_live(event.msg)
        elif event.msg_type == EventTypes.UPDATE_CHANNELS:
            self._state.update_channels(event.msg)
        elif event.msg_type == EventTypes.KILL_HLS_STREAM:
            self.local_shutdown_event.set()
        else:
            self._log.warning(
                f"Unknown event received: {event.msg_src}, {event.msg_type}"
            )


class ComboLoopedWorker(EventedWorker, SXMStatusSubscriber, HLSStatusSubscriber):
    _state: PlayerState

    def __init__(
        self,
        sxm_status: bool,
        stream_data: Tuple[Optional[str], Optional[str]],
        raw_live_data: Tuple[Optional[datetime], Optional[timedelta], Optional[dict]],
        *args,
        **kwargs,
    ):
        sxm_status_queue = kwargs.pop("sxm_status_queue")
        SXMStatusSubscriber.__init__(self, sxm_status_queue)

        hls_stream_queue = kwargs.pop("hls_stream_queue")
        HLSStatusSubscriber.__init__(self, hls_stream_queue)
        super().__init__(*args, **kwargs)

        self._event_queues = [self.hls_stream_queue, self.sxm_status_queue]

        self._state = PlayerState()
        self._state.sxm_running = sxm_status
        self._state.update_stream_data(stream_data)
        self._state.set_raw_live(raw_live_data)