AngellusMortis/sxm-player

View on GitHub
sxm_player/cli.py

Summary

Maintainability
A
0 mins
Test Coverage
# -*- coding: utf-8 -*-

"""Console script for sxm_player."""
import os
from multiprocessing import set_start_method
from pathlib import Path
from typing import Optional, Type

import psutil
import typer
from sxm import QualitySize, RegionChoice
from sxm.cli import (
    OPTION_HOST,
    OPTION_PASSWORD,
    OPTION_PORT,
    OPTION_PRECACHE,
    OPTION_QUALITY,
    OPTION_REGION,
    OPTION_USERNAME,
    OPTION_VERBOSE,
)

from sxm_player import handlers
from sxm_player.command import validate_player
from sxm_player.models import PlayerState
from sxm_player.players import BasePlayer
from sxm_player.queue import EventMessage, EventTypes
from sxm_player.runner import Runner
from sxm_player.utils import ACTIVE_PROCESS_STATUSES
from sxm_player.workers import ServerWorker, StatusWorker

OPTION_CONFIG_FILE = typer.Option(
    None,
    "-c",
    "--config-file",
    exists=True,
    file_okay=True,
    dir_okay=False,
    readable=True,
    resolve_path=True,
    help="Config file to read vars from",
)
OPTION_LOG_FILE = typer.Option(
    None,
    "-l",
    "--log-file",
    file_okay=True,
    resolve_path=True,
    dir_okay=False,
    readable=True,
    help="Output log file",
)
OPTION_OUTPUT_FOLDER = typer.Option(
    None,
    "-o",
    "--output-folder",
    file_okay=False,
    dir_okay=True,
    readable=True,
    writable=True,
    resolve_path=True,
    envvar="SXM_OUTPUT_FOLDER",
    help="output folder to save stream off to as it plays them",
)
OPTION_RESET_SONGS = typer.Option(
    False,
    "-R",
    "--reset-songs",
    help="Reset processed song database",
)
ARG_PLAYER_CLASS = typer.Argument(
    None,
    callback=validate_player,
    help="Optional Player Class to use",
    envvar="SXM_PLAYER_CLASS",
)


def main(
    config_file: Optional[Path] = OPTION_CONFIG_FILE,
    log_file: Optional[Path] = OPTION_LOG_FILE,
    verbose: bool = OPTION_VERBOSE,
    username: str = OPTION_USERNAME,
    password: str = OPTION_PASSWORD,
    region: RegionChoice = OPTION_REGION,
    quality: QualitySize = OPTION_QUALITY,
    port: int = OPTION_PORT,
    host: str = OPTION_HOST,
    output_folder: Optional[Path] = OPTION_OUTPUT_FOLDER,
    reset_songs: bool = OPTION_RESET_SONGS,
    precache: bool = OPTION_PRECACHE,
    player_class: Optional[str] = ARG_PLAYER_CLASS,
):
    """Command line interface for sxm-player"""

    if verbose:
        set_start_method("spawn")

    os.system("/usr/bin/clear")  # nosec

    klass: Optional[Type[BasePlayer]] = None
    if player_class is not None:
        klass = player_class  # type: ignore

    with Runner(log_file, verbose) as runner:
        state = PlayerState()

        runner.create_worker(
            StatusWorker,
            StatusWorker.NAME,
            port=port,
            ip=host,
            sxm_status=state.sxm_running,
        )

        if klass is not None:
            worker_args = klass.get_worker_args(**locals())
            if worker_args is not None:
                state.player_name = worker_args[1]
                runner.create_worker(worker_args[0], worker_args[1], **(worker_args[2]))

        while not runner.shutdown_event.is_set():
            event_loop(**locals())

    return 0


def spawn_sxm_worker(
    runner: Runner,
    host: str,
    port: int,
    username: str,
    password: str,
    region: RegionChoice,
    quality: QualitySize,
    precache: bool,
    **kwargs,
):
    runner.create_worker(
        ServerWorker,
        ServerWorker.NAME,
        port=port,
        ip=host,
        username=username,
        password=password,
        region=region,
        quality=quality,
        precache=precache,
    )


def event_loop(runner: Runner, state: PlayerState, **kwargs):
    if not state.is_connected:
        if state.mark_attempt(runner.log):
            spawn_sxm_worker(runner, **kwargs)

    event = runner.event_queue.safe_get()

    if not event:
        return

    runner.log.debug(f"Received event: {event.msg_src}, {event.msg_type.name}")

    was_connected: Optional[bool] = None
    if event.msg_src == ServerWorker.NAME:
        was_connected = state.is_connected

    handle_event(event=event, runner=runner, state=state, **kwargs)

    if was_connected is False and state.is_connected:
        if not was_connected and state.is_connected:
            runner.log.info(
                f"SXM Client started. {len(state.channels)} channels available"
            )

            state.sxm_running = True
            handlers.sxm_status_event(runner, EventTypes.SXM_STATUS, state.sxm_running)

    check_player(runner, state)


def handle_event(event: EventMessage, **kwargs):
    runner = kwargs["runner"]
    debug = kwargs["verbose"]
    event_name = event.msg_type.name.lower()
    is_debug_event = event_name.startswith("debug")
    handler_name = f"handle_{event_name}_event"

    if hasattr(handlers, handler_name) and (not is_debug_event or debug):
        getattr(handlers, handler_name)(event, **kwargs)
    else:
        runner.log.warning(f"Unknown event received: {event.msg_src}, {event.msg_type}")


def check_player(runner: Runner, state: PlayerState):
    if state.player_name is not None:
        player = runner.workers.get(state.player_name)
        running = True

        if player is None:
            running = False
        else:
            process = psutil.Process(player.process.pid)
            if process.status() not in ACTIVE_PROCESS_STATUSES:
                running = False

        if not running:
            runner.log.info("Player has stopped, shutting down")
            runner.shutdown_event.set()