Cog-Creators/Red-DiscordBot

View on GitHub
redbot/cogs/audio/core/tasks/startup.py

Summary

Maintainability
A
0 mins
Test Coverage
import asyncio
import itertools
from pathlib import Path

from typing import Optional

import lavalink
from lavalink import NodeNotFound, PlayerNotFound
from red_commons.logging import getLogger

from redbot.core.data_manager import cog_data_path
from redbot.core.i18n import Translator
from redbot.core.utils import AsyncIter
from redbot.core.utils.dbtools import APSWConnectionWrapper

from ...apis.interface import AudioAPIInterface
from ...apis.playlist_wrapper import PlaylistWrapper
from ...errors import DatabaseError, TrackEnqueueError
from ..abc import MixinMeta
from ..cog_utils import _SCHEMA_VERSION, CompositeMetaClass

log = getLogger("red.cogs.Audio.cog.Tasks.startup")
_ = Translator("Audio", Path(__file__))


class StartUpTasks(MixinMeta, metaclass=CompositeMetaClass):
    def start_up_task(self):
        # There has to be a task since this requires the bot to be ready
        # If it waits for ready in startup, we cause a deadlock during initial load
        # as initial load happens before the bot can ever be ready.
        lavalink.set_logging_level(self.bot._cli_flags.logging_level)
        self.cog_init_task = asyncio.create_task(self.initialize())

    async def initialize(self) -> None:
        await self.bot.wait_until_red_ready()
        # Unlike most cases, we want the cache to exit before migration.
        try:
            self.db_conn = APSWConnectionWrapper(
                str(cog_data_path(self.bot.get_cog("Audio")) / "Audio.db")
            )
            self.api_interface = AudioAPIInterface(
                self.bot, self.config, self.session, self.db_conn, self.bot.get_cog("Audio")
            )
            self.playlist_api = PlaylistWrapper(self.bot, self.config, self.db_conn)
            await self.playlist_api.init()
            await self.api_interface.initialize()
            self.global_api_user = await self.api_interface.global_cache_api.get_perms()
            await self.data_schema_migration(
                from_version=await self.config.schema_version(), to_version=_SCHEMA_VERSION
            )
            await self.playlist_api.delete_scheduled()
            await self.api_interface.persistent_queue_api.delete_scheduled()
            await self._build_bundled_playlist()
            self.lavalink_restart_connect()
            self.player_automated_timer_task = asyncio.create_task(self.player_automated_timer())
        except Exception as exc:
            log.critical("Audio failed to start up, please report this issue.", exc_info=exc)
            return

        self.cog_ready_event.set()

    async def restore_players(self):
        log.debug("Starting new restore player task")
        tries = 0
        tracks_to_restore = await self.api_interface.persistent_queue_api.fetch_all()
        while not lavalink.get_all_nodes():
            await asyncio.sleep(1)
            log.trace("Waiting for node to be available")
            tries += 1
            if tries > 600:  # Give 10 minutes from node creation date.
                log.warning("Unable to restore players, couldn't connect to Lavalink node.")
                return
        try:
            for node in lavalink.get_all_nodes():
                if not node.ready:
                    log.trace("Waiting for node: %r", node)
                    await node.wait_until_ready(timeout=60)  # In theory this should be instant.
        except asyncio.TimeoutError:
            log.error(
                "Restoring player task aborted due to a timeout waiting for Lavalink node to be ready."
            )
            log.warning("Audio will attempt queue restore on next restart.")
            return
        metadata = {}
        all_guilds = await self.config.all_guilds()
        async for guild_id, guild_data in AsyncIter(all_guilds.items(), steps=100):
            if guild_data["auto_play"]:
                if guild_data["currently_auto_playing_in"]:
                    notify_channel, vc_id = guild_data["currently_auto_playing_in"]
                    metadata[guild_id] = (notify_channel, vc_id)
        if self.lavalink_connection_aborted:
            log.warning("Aborting player restore due to Lavalink connection being aborted.")
            return
        for guild_id, track_data in itertools.groupby(tracks_to_restore, key=lambda x: x.guild_id):
            await asyncio.sleep(0)
            tries = 0
            try:
                player: Optional[lavalink.Player] = None
                track_data = list(track_data)
                guild = self.bot.get_guild(guild_id)
                if not guild:
                    log.verbose(
                        "Skipping player restore - Bot is no longer in Guild (%s)", guild_id
                    )
                    continue
                persist_cache = self._persist_queue_cache.setdefault(
                    guild_id, await self.config.guild(guild).persist_queue()
                )
                if not persist_cache:
                    log.verbose(
                        "Skipping player restore - Guild (%s) does not have a persist cache",
                        guild_id,
                    )
                    await self.api_interface.persistent_queue_api.drop(guild_id)
                    continue
                try:
                    player = lavalink.get_player(guild_id)
                except (NodeNotFound, PlayerNotFound):
                    player = None
                vc = 0
                guild_data = await self.config.guild_from_id(guild.id).all()
                shuffle = guild_data["shuffle"]
                repeat = guild_data["repeat"]
                volume = guild_data["volume"]
                shuffle_bumped = guild_data["shuffle_bumped"]
                auto_deafen = guild_data["auto_deafen"]

                if player is None:
                    while tries < 5 and vc is not None:
                        try:
                            notify_channel_id, vc_id = metadata.pop(
                                guild_id, (None, track_data[-1].room_id)
                            )
                            vc = guild.get_channel(vc_id)
                            if not vc:
                                break
                            perms = vc.permissions_for(guild.me)
                            if not (perms.connect and perms.speak):
                                vc = None
                                break
                            player = await lavalink.connect(vc, self_deaf=auto_deafen)
                            player.store("notify_channel", notify_channel_id)
                            break
                        except NodeNotFound:
                            await asyncio.sleep(5)
                            tries += 1
                        except Exception as exc:
                            tries += 1
                            log.debug(
                                "Failed to restore music voice channel %s", vc_id, exc_info=exc
                            )
                            if vc is None:
                                break
                            else:
                                await asyncio.sleep(1)

                if tries >= 5 or vc is None or player is None:
                    if tries >= 5:
                        log.verbose(
                            "Skipping player restore - Guild (%s), 5 attempts to restore player failed.",
                            guild_id,
                        )
                    elif vc is None:
                        log.verbose(
                            "Skipping player restore - Guild (%s), VC (%s) does not exist.",
                            guild_id,
                            vc_id,
                        )
                    else:
                        log.verbose(
                            "Skipping player restore - Guild (%s), Unable to create player for VC (%s).",
                            guild_id,
                            vc_id,
                        )
                    await self.api_interface.persistent_queue_api.drop(guild_id)
                    continue

                player.repeat = repeat
                player.shuffle = shuffle
                player.shuffle_bumped = shuffle_bumped
                if player.volume != volume:
                    await player.set_volume(volume)
                for track in track_data:
                    track = track.track_object
                    player.add(guild.get_member(track.extras.get("requester")) or guild.me, track)
                player.maybe_shuffle()
                if not player.is_playing:
                    await player.play()
                log.debug("Restored %r", player)
            except Exception as exc:
                log.debug("Error restoring player in %s", guild_id, exc_info=exc)
                await self.api_interface.persistent_queue_api.drop(guild_id)

        for guild_id, (notify_channel_id, vc_id) in metadata.items():
            guild = self.bot.get_guild(guild_id)
            player: Optional[lavalink.Player] = None
            vc = 0
            tries = 0
            if not guild:
                continue
            if self.lavalink_connection_aborted:
                player = None
            else:
                try:
                    player = lavalink.get_player(guild_id)
                except (NodeNotFound, PlayerNotFound):
                    player = None
            if player is None:
                guild_data = await self.config.guild_from_id(guild.id).all()
                shuffle = guild_data["shuffle"]
                repeat = guild_data["repeat"]
                volume = guild_data["volume"]
                shuffle_bumped = guild_data["shuffle_bumped"]
                auto_deafen = guild_data["auto_deafen"]

                while tries < 5 and vc is not None:
                    try:
                        vc = guild.get_channel(vc_id)
                        if not vc:
                            break
                        perms = vc.permissions_for(guild.me)
                        if not (perms.connect and perms.speak):
                            vc = None
                            break
                        player = await lavalink.connect(vc, self_deaf=auto_deafen)
                        player.store("notify_channel", notify_channel_id)
                        break
                    except NodeNotFound:
                        await asyncio.sleep(5)
                        tries += 1
                    except Exception as exc:
                        tries += 1
                        log.debug("Failed to restore music voice channel %s", vc_id, exc_info=exc)
                        if vc is None:
                            break
                        else:
                            await asyncio.sleep(1)
                if tries >= 5 or vc is None or player is None:
                    if tries >= 5:
                        log.verbose(
                            "Skipping player restore - Guild (%s), 5 attempts to restore player failed.",
                            guild_id,
                        )
                    elif vc is None:
                        log.verbose(
                            "Skipping player restore - Guild (%s), VC (%s) does not exist.",
                            guild_id,
                            vc_id,
                        )
                    else:
                        log.verbose(
                            "Skipping player restore - Guild (%s), Unable to create player for VC (%s).",
                            guild_id,
                            vc_id,
                        )
                    continue

                player.repeat = repeat
                player.shuffle = shuffle
                player.shuffle_bumped = shuffle_bumped
                if player.volume != volume:
                    await player.set_volume(volume)
                player.maybe_shuffle()
                log.debug("Restored %r", player)
                if not player.is_playing:
                    notify_channel = player.fetch("notify_channel")
                    try:
                        await self.api_interface.autoplay(player, self.playlist_api)
                    except DatabaseError:
                        notify_channel = guild.get_channel_or_thread(notify_channel)
                        if notify_channel:
                            await self.send_embed_msg(
                                notify_channel, title=_("Couldn't get a valid track.")
                            )
                        return
                    except TrackEnqueueError:
                        notify_channel = guild.get_channel_or_thread(notify_channel)
                        if notify_channel:
                            await self.send_embed_msg(
                                notify_channel,
                                title=_("Unable to Get Track"),
                                description=_(
                                    "I'm unable to get a track from the Lavalink node at the moment, "
                                    "try again in a few minutes."
                                ),
                            )
                        return
        del metadata
        del all_guilds
        log.debug("Player restore task completed successfully")