AngellusMortis/sxm-player

View on GitHub
sxm_player/utils.py

Summary

Maintainability
A
0 mins
Test Coverage
import logging
import os
import select
import shlex
import subprocess  # nosec
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Union

import coloredlogs  # type: ignore
import psutil
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.session import Session
from sxm.models import XMArt, XMImage

from sxm_player.models import DBEpisode, DBSong

ACTIVE_PROCESS_STATUSES = [
    psutil.STATUS_RUNNING,
    psutil.STATUS_SLEEPING,
    psutil.STATUS_DISK_SLEEP,
]
FS_DATETIME_FORMAT = "%Y%m%d-%H%M%S%z"


unrelated_loggers = [
    "discord.client",
    "discord.gateway",
    "discord.http",
    "urllib3.connectionpool",
    "websockets.protocol",
]

logger = logging.getLogger("sxm_player.utils")


def init_db(
    base_folder: str,
    cleanup: Optional[bool] = True,
    reset: Optional[bool] = False,
) -> Session:
    """Initializes song database connection"""

    from .models import Base

    os.makedirs(base_folder, exist_ok=True)

    song_db = os.path.join(base_folder, "songs.db")

    if reset and os.path.exists(song_db):
        logger.info("Reseting database...")
        os.remove(song_db)

    db_engine = create_engine(f"sqlite:///{song_db}")
    Base.metadata.create_all(db_engine)
    db_session = sessionmaker(bind=db_engine)()

    if cleanup:
        removed = 0
        for song in db_session.query(DBSong).all():
            if not os.path.exists(song.file_path):
                removed += 1
                db_session.delete(song)

        for show in db_session.query(DBEpisode).all():
            if not os.path.exists(show.file_path):
                removed += 1
                db_session.delete(show)

        if removed > 0:
            logger.warn(f"deleted missing songs/shows: {removed}")
            db_session.commit()

    logger.info("Database initalized")
    return db_session


def get_art_url_by_size(arts: List[XMArt], size: str) -> Optional[str]:
    for art in arts:
        if isinstance(art, XMImage) and art.size is not None and art.size == size:
            return art.url
    return None


def get_art_thumb_url(arts: List[XMArt]) -> Optional[str]:
    thumb: Optional[str] = None

    for art in arts:
        if (
            isinstance(art, XMImage)
            and art.height is not None
            and art.height > 100
            and art.height < 200
            and art.height == art.width
        ):
            # logo on dark is what we really want
            if art.name == "show logo on dark":
                thumb = art.url
                break
            # but it is not always there, so fallback image
            elif art.name == "image":
                thumb = art.url

    return thumb


def get_files(folder: str) -> List[str]:
    """Gets list of files in a folder"""

    dir_list = os.listdir(folder)

    files = []
    for dir_item in dir_list:
        abs_path = os.path.join(folder, dir_item)
        if os.path.isfile(abs_path):
            files.append(dir_item)

    return files


def splice_file(
    input_file: str, output_file: str, start_time: int, end_time: int
) -> Union[str, None]:
    """Splices a chunk off of the input file and saves it"""

    ffmpeg_command = 'ffmpeg -y -i "{}" -acodec copy -ss {} -to {} -loglevel fatal "{}"'
    args = shlex.split(
        ffmpeg_command.format(input_file, start_time, end_time, output_file)
    )

    os.makedirs(os.path.dirname(output_file), exist_ok=True)

    try:
        subprocess.run(args, check=True)  # nosec
    except subprocess.CalledProcessError as e:
        logger.error(f"failed to split file: {e}")
        return None
    else:
        logger.info(f"spliced file: {output_file}")
        return output_file


def create_fs_datetime(dt):
    return dt.strftime(FS_DATETIME_FORMAT)


def from_fs_datetime(dt_string):
    return datetime.strptime(dt_string, FS_DATETIME_FORMAT)


def configure_root_logger(level: str, log_file: Optional[Path] = None):
    root_logger = logging.getLogger()
    if len(root_logger.handlers) == 0:
        if log_file is not None:
            fh = logging.FileHandler(log_file)
            formatter = logging.Formatter(
                "%(asctime)s %(name)s[%(process)d] %(levelname)s %(message)s"
            )
            fh.setLevel(level)
            fh.setFormatter(formatter)
            root_logger.addHandler(fh)
        coloredlogs.install(level=level, logger=root_logger)

    for logger in unrelated_loggers:
        logging.getLogger(logger).setLevel(logging.INFO)


class FFmpeg:
    command: str
    process: Optional[subprocess.Popen] = None

    _stderr_poll: Optional[select.poll] = None

    def start_ffmpeg(self) -> None:
        ffmpeg_args = shlex.split(self.command)

        self.process = subprocess.Popen(ffmpeg_args, stderr=subprocess.PIPE)  # nosec

        self._stderr_poll = select.poll()

        if self.process.stderr is not None:
            self._stderr_poll.register(self.process.stderr, select.POLLIN)

    def check_process(self) -> bool:
        if self.process is None:
            return False

        process = psutil.Process(self.process.pid)
        status = process.status()

        return status in ACTIVE_PROCESS_STATUSES

    def stop_ffmpeg(self) -> None:
        if self.process is None:
            return

        self.process.kill()
        if self.process.poll() is None:
            self.process.communicate()

        self.process = None

    def read_errors(self) -> List[str]:
        if self.process is None or self._stderr_poll is None:
            return []

        lines: List[str] = []
        while self._stderr_poll.poll(0.1):
            if self.process.stderr is not None:
                lines.append(self.process.stderr.readline().decode("utf8"))

        return lines