AngellusMortis/sxm-player

View on GitHub
sxm_player/queue.py

Summary

Maintainability
A
0 mins
Test Coverage
import multiprocessing.queues as mpq
import time
from enum import Enum, auto
from multiprocessing import get_context
from queue import Empty, Full
from typing import Any, Optional

DEFAULT_POLLING_TIMEOUT = 0.02


class EventTypes(Enum):
    RESET_SXM = auto()
    SXM_STATUS = auto()
    UPDATE_CHANNELS = auto()
    UPDATE_METADATA = auto()
    HLS_STREAM_STARTED = auto()
    HLS_STDERROR_LINES = auto()
    TRIGGER_HLS_STREAM = auto()
    KILL_HLS_STREAM = auto()
    DEBUG_START_PLAYER = auto()
    DEBUG_STOP_PLAYER = auto()


class EventMessage:
    id: float  # noqa: A003
    msg_src: str
    msg_relay: str
    msg_type: EventTypes
    msg: Any

    def __init__(self, msg_src, msg_type, msg, msg_relay=None):
        self.id = time.monotonic()
        self.msg_src = msg_src
        self.msg_relay = msg_relay
        self.msg_type = msg_type
        self.msg = msg

    def __str__(self):
        return f"{self.msg_src} - {self.msg_type}: {self.msg}"


class Queue(mpq.Queue):
    # -- See StackOverflow Article :
    #   https://stackoverflow.com/questions/39496554/cannot-subclass-multiprocessing-queue-in-python-3-5
    #
    # -- tldr; mp.Queue is a _method_ that returns an mpq.Queue object.  That
    # object requires a context for proper operation, so this __init__ does
    # that work as well.
    def __init__(self, *args, **kwargs):
        ctx = get_context()
        super().__init__(*args, **kwargs, ctx=ctx)

    def safe_get(
        self, timeout: float = DEFAULT_POLLING_TIMEOUT
    ) -> Optional[EventMessage]:
        try:
            if timeout is None:
                return self.get(block=False)
            else:
                return self.get(block=True, timeout=timeout)
        except Empty:
            return None

    def safe_put(
        self, item: EventMessage, timeout: float = DEFAULT_POLLING_TIMEOUT
    ) -> bool:
        try:
            self.put(item, block=False, timeout=timeout)
            return True
        except Full:
            return False

    def drain(self):
        item = self.safe_get()
        while item:
            yield item
            item = self.safe_get()

    def safe_close(self) -> int:
        num_left = sum(1 for __ in self.drain())
        self.close()
        self.join_thread()
        return num_left