AngellusMortis/sxm-discord

View on GitHub
sxm_discord/models.py

Summary

Maintainability
A
0 mins
Test Coverage
from datetime import datetime, timezone
from typing import Any, List, Optional, Tuple, Union

from discord import Client, Embed, FFmpegOpusAudio, Game, Message, errors
from discord.channel import DMChannel, GroupChannel, TextChannel
from discord_slash import SlashContext  # type: ignore
from humanize import naturaltime  # type: ignore
from pydantic import BaseModel  # pylint: disable=no-name-in-module
from sxm.models import XMChannel, XMCutMarker, XMLiveChannel, XMSong
from sxm_player.models import Episode, PlayerState, Song

from .utils import (
    generate_embed_from_archived,
    generate_embed_from_cut,
    get_art_url_by_size,
    send_message,
)


class QueuedItem(BaseModel):
    audio_file: Union[Song, Episode, None] = None
    stream_data: Optional[Tuple[XMChannel, str]] = None

    source: Optional[FFmpegOpusAudio] = None

    class Config:
        arbitrary_types_allowed = True


class ArchivedQueuedItem(QueuedItem):
    audio_file: Union[Song, Episode]


class SXMQueuedItem(QueuedItem):
    stream_data: Tuple[XMChannel, str]


class SongActivity(Game):
    def __init__(
        self,
        song: Song,
        **kwargs,
    ):
        self._start = self._end = 0.0
        self.assets = kwargs.pop("assets", {})
        self.party = kwargs.pop("party", {})
        self.application_id = kwargs.pop("application_id", None)
        self.url = kwargs.pop("url", None)
        self.flags = kwargs.pop("flags", 0)
        self.sync_id = kwargs.pop("sync_id", None)
        self.session_id = kwargs.pop("session_id", None)

        self.update_status(song)

    def update_status(
        self, song: Optional[Song], state: str = "Playing music", name_suffix: str = ""
    ) -> None:
        """Updates activity object from current channel playing"""

        self.state = state
        self.name = self.details = name_suffix

        if song is not None:
            self.name = self.details = song.pretty_name + name_suffix
            self.large_image_url = song.image_url
            if song.album is not None:
                self.large_image_text = f"{song.album} by {song.artist}"


class SXMActivity(SongActivity):
    def __init__(
        self,
        start: Optional[datetime],
        radio_time: Optional[datetime],
        channel: XMChannel,
        live_channel: XMLiveChannel,
        **kwargs,
    ):

        if start is None:
            self._start = 0.0
        else:
            self._start = start.timestamp() * 1000.0
        self._end = 0.0
        self.assets = kwargs.pop("assets", {})
        self.party = kwargs.pop("party", {})
        self.application_id = kwargs.pop("application_id", None)
        self.url = kwargs.pop("url", None)
        self.flags = kwargs.pop("flags", 0)
        self.sync_id = kwargs.pop("sync_id", None)
        self.session_id = kwargs.pop("session_id", None)

        suffix = f"SXM {channel.pretty_name}"
        song = self.create_song(channel, live_channel, radio_time)
        if song is None:
            episode = live_channel.get_latest_episode(now=radio_time)
            if episode is not None:
                suffix = f'"{episode.episode.long_title}" on {suffix}'
        else:
            suffix = f" on {suffix}"

        self.update_status(
            song,
            state="Playing music from SXM",
            name_suffix=suffix,
        )

    def create_song(
        self,
        channel: XMChannel,
        live_channel: XMLiveChannel,
        radio_time: Optional[datetime],
    ) -> Optional[Song]:
        """Updates activity object from current channel playing"""

        latest_cut = live_channel.get_latest_cut(now=radio_time)
        if latest_cut is not None and isinstance(latest_cut.cut, XMSong):
            image_url = (
                None
                if latest_cut.cut.album is None
                else get_art_url_by_size(latest_cut.cut.album.arts, "MEDIUM")
            )
            return Song(
                guid="",
                title=latest_cut.cut.title,
                artist=latest_cut.cut.artists[0].name,
                air_time=latest_cut.time,
                channel=channel.id,
                file_path="",
                image_url=image_url,
            )

        return None


class ReactionCarousel(BaseModel):
    items: List[Any]
    index: int = 0
    last_update: Optional[datetime] = None
    message: Optional[Message] = None

    class Config:
        arbitrary_types_allowed = True

    @property
    def current(self):
        return self.items[self.index]

    def get_message_kwargs(self, state: PlayerState) -> dict:
        raise NotImplementedError()

    async def update_message(
        self, message: Optional[str] = None, embed: Optional[Embed] = None
    ):
        raise NotImplementedError()

    async def refresh_message(self, client: Client):
        if self.message is None:
            return

        mid = self.message.channel.id
        channel_type = Union[TextChannel, DMChannel, GroupChannel, None]
        channel: channel_type = client.get_channel(mid)  # type: ignore
        if channel is None:
            self.message = None
            return

        try:
            self.message = await channel.fetch_message(self.message.id)
        except errors.NotFound:
            self.message = None

    async def clear_reactions(self):
        if self.message is None:
            return

        for reaction in self.message.reactions:
            await reaction.clear()

    async def handle_reaction(self, state: PlayerState, emoji: str):
        if emoji == "⬅️":
            self.index = max(0, self.index - 1)
        elif emoji == "➡️":
            self.index = min(len(self.items), self.index + 1)

        await self.update(state)

    async def update(self, state: PlayerState, ctx: Optional[SlashContext] = None):
        if self.message is None:
            self.message = await send_message(ctx, **self.get_message_kwargs(state))
        else:
            await self.update_message(**self.get_message_kwargs(state))

        await self.clear_reactions()

        if self.index > 0:
            await self.message.add_reaction("⬅️")
        if self.index < (len(self.items) - 1):
            await self.message.add_reaction("➡️")

        self.last_update = datetime.now()


class SXMCutCarousel(ReactionCarousel):
    items: List[XMCutMarker]
    latest: XMCutMarker
    channel: XMChannel
    body: str

    @property
    def current(self) -> XMCutMarker:
        return super().current

    def _get_footer(self, state: PlayerState):
        if self.current == self.latest:
            return f"Now Playing | {self.index+1}/{len(self.items)} Recent Songs"

        now = state.radio_time or datetime.now(timezone.utc)
        time_string = naturaltime(now - self.current.time)

        return (
            f"About {time_string} ago | "
            f"{self.index+1}/{len(self.items)} Recent Songs"
        )

    async def update_message(
        self, message: Optional[str] = None, embed: Optional[Embed] = None
    ):
        if self.message is not None and embed is not None:
            await self.message.edit(embed=embed)

    def get_message_kwargs(self, state: PlayerState) -> dict:
        if state.live is None:
            raise ValueError("Nothing is playing")

        episode = state.live.get_latest_episode(self.latest.time)

        return {
            "message": self.body,
            "embed": generate_embed_from_cut(
                self.channel,
                self.current,
                episode,
                footer=self._get_footer(state),
            ),
        }


class ArchivedSongCarousel(ReactionCarousel):
    items: List[Union[Song, Episode]]
    body: str

    @property
    def current(self) -> Song:
        return super().current

    def _get_footer(self):
        return f"GUID: {self.current.guid} | {self.index+1}/{len(self.items)} Songs"

    async def update_message(
        self, message: Optional[str] = None, embed: Optional[Embed] = None
    ):
        if self.message is not None and embed is not None:
            await self.message.edit(embed=embed)

    def get_message_kwargs(self, state: PlayerState) -> dict:
        return {
            "message": self.body,
            "embed": generate_embed_from_archived(
                self.current, footer=self._get_footer()
            ),
        }


class UpcomingSongCarousel(ArchivedSongCarousel):
    latest: Union[Song, Episode, None] = None

    def _get_footer(self):
        if self.current == self.latest:
            message = "Playing Next"
        else:
            message = f"{self.index+1} Away"

        return f"{message} | {self.index+1}/{len(self.items)} Songs"