Cog-Creators/Red-DiscordBot

View on GitHub
redbot/cogs/audio/apis/interface.py

Summary

Maintainability
A
0 mins
Test Coverage
import asyncio
import contextlib
import datetime
import json
import random
import time

from collections import namedtuple
from pathlib import Path
from typing import TYPE_CHECKING, Callable, List, MutableMapping, Optional, Tuple, Union, cast

import aiohttp
import discord
import lavalink
from red_commons.logging import getLogger

from lavalink.rest_api import LoadResult, LoadType
from redbot.core import Config, commands
from redbot.core.bot import Red
from redbot.core.commands import Cog, Context
from redbot.core.i18n import Translator
from redbot.core.utils import AsyncIter
from redbot.core.utils.dbtools import APSWConnectionWrapper

from ..audio_dataclasses import Query
from ..errors import DatabaseError, SpotifyFetchError, TrackEnqueueError, YouTubeApiError
from ..utils import CacheLevel, Notifier
from .api_utils import LavalinkCacheFetchForGlobalResult
from .global_db import GlobalCacheWrapper
from .local_db import LocalCacheWrapper
from .persist_queue_wrapper import QueueInterface
from .playlist_interface import get_playlist
from .playlist_wrapper import PlaylistWrapper
from .spotify import SpotifyWrapper
from .youtube import YouTubeWrapper

if TYPE_CHECKING:
    from .. import Audio

_ = Translator("Audio", Path(__file__))
log = getLogger("red.cogs.Audio.api.AudioAPIInterface")
_TOP_100_US = "https://www.youtube.com/playlist?list=PL4fGSI1pDJn5rWitrRWFKdm-ulaFiIyoK"
# TODO: Get random from global Cache


class AudioAPIInterface:
    """Handles music queries.

    Always tries the Local cache first, then Global cache before making API calls.
    """

    def __init__(
        self,
        bot: Red,
        config: Config,
        session: aiohttp.ClientSession,
        conn: APSWConnectionWrapper,
        cog: Union["Audio", Cog],
    ):
        self.bot = bot
        self.config = config
        self.conn = conn
        self.cog = cog
        self.spotify_api: SpotifyWrapper = SpotifyWrapper(self.bot, self.config, session, self.cog)
        self.youtube_api: YouTubeWrapper = YouTubeWrapper(self.bot, self.config, session, self.cog)
        self.local_cache_api = LocalCacheWrapper(self.bot, self.config, self.conn, self.cog)
        self.global_cache_api = GlobalCacheWrapper(self.bot, self.config, session, self.cog)
        self.persistent_queue_api = QueueInterface(self.bot, self.config, self.conn, self.cog)
        self._session: aiohttp.ClientSession = session
        self._tasks: MutableMapping = {}
        self._lock: asyncio.Lock = asyncio.Lock()

    async def initialize(self) -> None:
        """Initialises the Local Cache connection."""
        await self.local_cache_api.lavalink.init()
        await self.persistent_queue_api.init()

    def close(self) -> None:
        """Closes the Local Cache connection."""
        self.local_cache_api.lavalink.close()

    async def get_random_track_from_db(self, tries=0) -> Optional[MutableMapping]:
        """Get a random track from the local database and return it."""
        track: Optional[MutableMapping] = {}
        try:
            query_data = {}
            date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=7)
            date_timestamp = int(date.timestamp())
            query_data["day"] = date_timestamp
            max_age = await self.config.cache_age()
            maxage = datetime.datetime.now(tz=datetime.timezone.utc) - datetime.timedelta(
                days=max_age
            )
            maxage_int = int(time.mktime(maxage.timetuple()))
            query_data["maxage"] = maxage_int
            track = await self.local_cache_api.lavalink.fetch_random(query_data)
            if track is not None:
                if track.get("loadType") == "V2_COMPACT":
                    track["loadType"] = "V2_COMPAT"
                results = LoadResult(track)
                track = random.choice(list(results.tracks))
        except Exception as exc:
            log.trace("Failed to fetch a random track from database", exc_info=exc)
            track = {}

        if not track:
            return None

        return track

    async def route_tasks(
        self,
        action_type: str = None,
        data: Union[List[MutableMapping], MutableMapping] = None,
    ) -> None:
        """Separate the tasks and run them in the appropriate functions."""

        if not data:
            return
        if action_type == "insert" and isinstance(data, list):
            for table, d in data:
                if table == "lavalink":
                    await self.local_cache_api.lavalink.insert(d)
                elif table == "youtube":
                    await self.local_cache_api.youtube.insert(d)
                elif table == "spotify":
                    await self.local_cache_api.spotify.insert(d)
        elif action_type == "update" and isinstance(data, dict):
            for table, d in data:
                if table == "lavalink":
                    await self.local_cache_api.lavalink.update(data)
                elif table == "youtube":
                    await self.local_cache_api.youtube.update(data)
                elif table == "spotify":
                    await self.local_cache_api.spotify.update(data)
        elif action_type == "global" and isinstance(data, list):
            await asyncio.gather(*[self.global_cache_api.update_global(**d) for d in data])

    async def run_tasks(self, ctx: Optional[commands.Context] = None, message_id=None) -> None:
        """Run tasks for a specific context."""
        if message_id is not None:
            lock_id = message_id
        elif ctx is not None:
            lock_id = ctx.message.id
        else:
            return
        lock_author = ctx.author if ctx else None
        async with self._lock:
            if lock_id in self._tasks:
                log.trace("Running database writes for %s (%s)", lock_id, lock_author)
                try:
                    tasks = self._tasks[lock_id]
                    tasks = [self.route_tasks(a, tasks[a]) for a in tasks]
                    await asyncio.gather(*tasks, return_exceptions=False)
                    del self._tasks[lock_id]
                except Exception as exc:
                    log.verbose(
                        "Failed database writes for %s (%s)", lock_id, lock_author, exc_info=exc
                    )
                else:
                    log.trace("Completed database writes for %s (%s)", lock_id, lock_author)

    async def run_all_pending_tasks(self) -> None:
        """Run all pending tasks left in the cache, called on cog_unload."""
        async with self._lock:
            log.trace("Running pending writes to database")
            try:
                tasks: MutableMapping = {"update": [], "insert": [], "global": []}
                async for k, task in AsyncIter(self._tasks.items()):
                    async for t, args in AsyncIter(task.items()):
                        tasks[t].append(args)
                self._tasks = {}
                coro_tasks = [self.route_tasks(a, tasks[a]) for a in tasks]

                await asyncio.gather(*coro_tasks, return_exceptions=False)

            except Exception as exc:
                log.verbose("Failed database writes", exc_info=exc)
            else:
                log.trace("Completed pending writes to database have finished")

    def append_task(self, ctx: commands.Context, event: str, task: Tuple, _id: int = None) -> None:
        """Add a task to the cache to be run later."""
        lock_id = _id or ctx.message.id
        if lock_id not in self._tasks:
            self._tasks[lock_id] = {"update": [], "insert": [], "global": []}
        self._tasks[lock_id][event].append(task)

    async def fetch_spotify_query(
        self,
        ctx: commands.Context,
        query_type: str,
        uri: str,
        notifier: Optional[Notifier],
        skip_youtube: bool = False,
        current_cache_level: CacheLevel = CacheLevel.all(),
    ) -> List[str]:
        """Return youtube URLS for the spotify URL provided."""
        youtube_urls = []
        tracks = await self.fetch_from_spotify_api(
            query_type, uri, params=None, notifier=notifier, ctx=ctx
        )
        total_tracks = len(tracks)
        database_entries = []
        track_count = 0
        time_now = int(datetime.datetime.now(datetime.timezone.utc).timestamp())
        youtube_cache = CacheLevel.set_youtube().is_subset(current_cache_level)
        youtube_api_error = None
        global_api = self.cog.global_api_user.get("can_read")
        async for track in AsyncIter(tracks):
            if isinstance(track, str):
                break
            elif isinstance(track, dict) and track.get("error", {}).get("message") == "invalid id":
                continue
            (
                song_url,
                track_info,
                uri,
                artist_name,
                track_name,
                _id,
                _type,
            ) = await self.spotify_api.get_spotify_track_info(track, ctx)

            database_entries.append(
                {
                    "id": _id,
                    "type": _type,
                    "uri": uri,
                    "track_name": track_name,
                    "artist_name": artist_name,
                    "song_url": song_url,
                    "track_info": track_info,
                    "last_updated": time_now,
                    "last_fetched": time_now,
                }
            )
            if skip_youtube is False:
                val = None
                if youtube_cache:
                    try:
                        (val, last_update) = await self.local_cache_api.youtube.fetch_one(
                            {"track": track_info}
                        )
                    except Exception as exc:
                        log.verbose(
                            "Failed to fetch %r from YouTube table", track_info, exc_info=exc
                        )
                if val is None:
                    try:
                        val = await self.fetch_youtube_query(
                            ctx, track_info, current_cache_level=current_cache_level
                        )
                    except YouTubeApiError as exc:
                        val = None
                        youtube_api_error = exc.message
                if youtube_cache and val:
                    task = ("update", ("youtube", {"track": track_info}))
                    self.append_task(ctx, *task)
                if val:
                    youtube_urls.append(val)
            else:
                youtube_urls.append(track_info)
            track_count += 1
            if notifier is not None and ((track_count % 2 == 0) or (track_count == total_tracks)):
                await notifier.notify_user(current=track_count, total=total_tracks, key="youtube")
            if notifier is not None and (youtube_api_error and not global_api):
                error_embed = discord.Embed(
                    colour=await ctx.embed_colour(),
                    title=_("Failing to get tracks, skipping remaining."),
                )
                await notifier.update_embed(error_embed)
                break
            elif notifier is not None and (youtube_api_error and global_api):
                continue
        if CacheLevel.set_spotify().is_subset(current_cache_level):
            task = ("insert", ("spotify", database_entries))
            self.append_task(ctx, *task)
        return youtube_urls

    async def fetch_from_spotify_api(
        self,
        query_type: str,
        uri: str,
        recursive: Union[str, bool] = False,
        params: MutableMapping = None,
        notifier: Optional[Notifier] = None,
        ctx: Context = None,
    ) -> Union[List[MutableMapping], List[str]]:
        """Gets track info from spotify API."""

        if recursive is False:
            (call, params) = self.spotify_api.spotify_format_call(query_type, uri)
            results = await self.spotify_api.make_get_call(call, params)
        else:
            if isinstance(recursive, str):
                results = await self.spotify_api.make_get_call(recursive, params)
            else:
                results = {}
        try:
            if results["error"]["status"] == 401 and not recursive:
                raise SpotifyFetchError(
                    _(
                        "The Spotify API key or client secret has not been set properly. "
                        "\nUse `{prefix}audioset spotifyapi` for instructions."
                    )
                )
            elif recursive:
                return {"next": None}
        except KeyError:
            pass
        if recursive:
            return results
        tracks = []
        track_count = 0
        total_tracks = results.get("tracks", results).get("total", 1)
        while True:
            new_tracks: List = []
            if query_type == "track":
                new_tracks = results
                tracks.append(new_tracks)
            elif query_type == "album":
                tracks_raw = results.get("tracks", results).get("items", [])
                if tracks_raw:
                    new_tracks = tracks_raw
                    tracks.extend(new_tracks)
            else:
                tracks_raw = results.get("tracks", results).get("items", [])
                if tracks_raw:
                    new_tracks = [k["track"] for k in tracks_raw if k.get("track")]
                    tracks.extend(new_tracks)
            track_count += len(new_tracks)
            if notifier:
                await notifier.notify_user(current=track_count, total=total_tracks, key="spotify")
            try:
                if results.get("next") is not None:
                    results = await self.fetch_from_spotify_api(
                        query_type, uri, results["next"], params, notifier=notifier
                    )
                    continue
                else:
                    break
            except KeyError:
                raise SpotifyFetchError(
                    _("This doesn't seem to be a valid Spotify playlist/album URL or code.")
                )
        return tracks

    async def spotify_query(
        self,
        ctx: commands.Context,
        query_type: str,
        uri: str,
        skip_youtube: bool = False,
        notifier: Optional[Notifier] = None,
    ) -> List[str]:
        """Queries the Database then falls back to Spotify and YouTube APIs.

        Parameters
        ----------
        ctx: commands.Context
            The context this method is being called under.
        query_type : str
            Type of query to perform (Pl
        uri: str
            Spotify URL ID.
        skip_youtube:bool
            Whether or not to skip YouTube API Calls.
        notifier: Notifier
            A Notifier object to handle the user UI notifications while tracks are loaded.
        Returns
        -------
        List[str]
            List of Youtube URLs.
        """
        current_cache_level = CacheLevel(await self.config.cache_level())
        cache_enabled = CacheLevel.set_spotify().is_subset(current_cache_level)
        if query_type == "track" and cache_enabled:
            try:
                (val, last_update) = await self.local_cache_api.spotify.fetch_one(
                    {"uri": f"spotify:track:{uri}"}
                )
            except Exception as exc:
                log.verbose(
                    "Failed to fetch 'spotify:track:%s' from Spotify table", uri, exc_info=exc
                )
                val = None
        else:
            val = None
        youtube_urls = []
        if val is None:
            urls = await self.fetch_spotify_query(
                ctx,
                query_type,
                uri,
                notifier,
                skip_youtube,
                current_cache_level=current_cache_level,
            )
            youtube_urls.extend(urls)
        else:
            if query_type == "track" and cache_enabled:
                task = ("update", ("spotify", {"uri": f"spotify:track:{uri}"}))
                self.append_task(ctx, *task)
            youtube_urls.append(val)
        return youtube_urls

    async def spotify_enqueue(
        self,
        ctx: commands.Context,
        query_type: str,
        uri: str,
        enqueue: bool,
        player: lavalink.Player,
        lock: Callable,
        notifier: Optional[Notifier] = None,
        forced: bool = False,
        query_global: bool = True,
    ) -> List[lavalink.Track]:
        """Queries the Database then falls back to Spotify and YouTube APIs then Enqueued matched
        tracks.

        Parameters
        ----------
        ctx: commands.Context
            The context this method is being called under.
        query_type : str
            Type of query to perform (Pl
        uri: str
            Spotify URL ID.
        enqueue:bool
            Whether or not to enqueue the tracks
        player: lavalink.Player
            The current Player.
        notifier: Notifier
            A Notifier object to handle the user UI notifications while tracks are loaded.
        lock: Callable
            A callable handling the Track enqueue lock while spotify tracks are being added.
        query_global: bool
            Whether or not to query the global API.
        forced: bool
            Ignore Cache and make a fetch from API.
        Returns
        -------
        List[str]
            List of Youtube URLs.
        """
        await self.global_cache_api._get_api_key()
        globaldb_toggle = self.cog.global_api_user.get("can_read")
        global_entry = globaldb_toggle and query_global
        track_list: List = []
        has_not_allowed = False
        youtube_api_error = None
        skip_youtube_api = False
        try:
            current_cache_level = CacheLevel(await self.config.cache_level())
            guild_data = await self.config.guild(ctx.guild).all()
            enqueued_tracks = 0
            consecutive_fails = 0
            queue_dur = await self.cog.queue_duration(ctx)
            queue_total_duration = self.cog.format_time(queue_dur)
            before_queue_length = len(player.queue)
            tracks_from_spotify = await self.fetch_from_spotify_api(
                query_type, uri, params=None, notifier=notifier
            )
            total_tracks = len(tracks_from_spotify)
            if total_tracks < 1 and notifier is not None:
                lock(ctx, False)
                embed3 = discord.Embed(
                    colour=await ctx.embed_colour(),
                    title=_("This doesn't seem to be a supported Spotify URL or code."),
                )
                await notifier.update_embed(embed3)

                return track_list
            database_entries = []
            time_now = int(datetime.datetime.now(datetime.timezone.utc).timestamp())
            youtube_cache = CacheLevel.set_youtube().is_subset(current_cache_level)
            spotify_cache = CacheLevel.set_spotify().is_subset(current_cache_level)
            async for track_count, track in AsyncIter(tracks_from_spotify).enumerate(start=1):
                (
                    song_url,
                    track_info,
                    uri,
                    artist_name,
                    track_name,
                    _id,
                    _type,
                ) = await self.spotify_api.get_spotify_track_info(track, ctx)

                database_entries.append(
                    {
                        "id": _id,
                        "type": _type,
                        "uri": uri,
                        "track_name": track_name,
                        "artist_name": artist_name,
                        "song_url": song_url,
                        "track_info": track_info,
                        "last_updated": time_now,
                        "last_fetched": time_now,
                    }
                )
                val = None
                llresponse = None
                if youtube_cache:
                    try:
                        (val, last_updated) = await self.local_cache_api.youtube.fetch_one(
                            {"track": track_info}
                        )
                    except Exception as exc:
                        log.verbose(
                            "Failed to fetch %r from YouTube table", track_info, exc_info=exc
                        )
                should_query_global = globaldb_toggle and query_global and val is None
                if should_query_global:
                    llresponse = await self.global_cache_api.get_spotify(track_name, artist_name)
                    if llresponse:
                        if llresponse.get("loadType") == "V2_COMPACT":
                            llresponse["loadType"] = "V2_COMPAT"
                        llresponse = LoadResult(llresponse)
                    val = llresponse or None
                if val is None and not skip_youtube_api:
                    try:
                        val = await self.fetch_youtube_query(
                            ctx, track_info, current_cache_level=current_cache_level
                        )
                    except YouTubeApiError as exc:
                        val = None
                        youtube_api_error = exc.message
                        skip_youtube_api = True
                if not youtube_api_error:
                    if youtube_cache and val and llresponse is None:
                        task = ("update", ("youtube", {"track": track_info}))
                        self.append_task(ctx, *task)

                    if isinstance(llresponse, LoadResult):
                        track_object = llresponse.tracks
                    elif val:
                        result = None
                        if should_query_global:
                            llresponse = await self.global_cache_api.get_call(val)
                            if llresponse:
                                if llresponse.get("loadType") == "V2_COMPACT":
                                    llresponse["loadType"] = "V2_COMPAT"
                                llresponse = LoadResult(llresponse)
                            result = llresponse or None
                        if not result:
                            try:
                                (result, called_api) = await self.fetch_track(
                                    ctx,
                                    player,
                                    Query.process_input(val, self.cog.local_folder_current_path),
                                    forced=forced,
                                    should_query_global=not should_query_global,
                                )
                            except (RuntimeError, aiohttp.ServerDisconnectedError):
                                lock(ctx, False)
                                error_embed = discord.Embed(
                                    colour=await ctx.embed_colour(),
                                    title=_(
                                        "The connection was reset while loading the playlist."
                                    ),
                                )
                                if notifier is not None:
                                    await notifier.update_embed(error_embed)
                                break
                            except asyncio.TimeoutError:
                                lock(ctx, False)
                                error_embed = discord.Embed(
                                    colour=await ctx.embed_colour(),
                                    title=_("Player timeout, skipping remaining tracks."),
                                )
                                if notifier is not None:
                                    await notifier.update_embed(error_embed)
                                break
                        track_object = result.tracks
                    else:
                        track_object = []
                else:
                    track_object = []
                if (track_count % 2 == 0) or (track_count == total_tracks):
                    key = "lavalink"
                    seconds = "???"
                    second_key = None
                    if notifier is not None:
                        await notifier.notify_user(
                            current=track_count,
                            total=total_tracks,
                            key=key,
                            seconds_key=second_key,
                            seconds=seconds,
                        )

                if (youtube_api_error and not global_entry) or consecutive_fails >= (
                    20 if global_entry else 10
                ):
                    error_embed = discord.Embed(
                        colour=await ctx.embed_colour(),
                        title=_("Failing to get tracks, skipping remaining."),
                    )
                    if notifier is not None:
                        await notifier.update_embed(error_embed)
                    if youtube_api_error:
                        lock(ctx, False)
                        raise SpotifyFetchError(message=youtube_api_error)
                    break
                if not track_object:
                    consecutive_fails += 1
                    continue
                consecutive_fails = 0
                single_track = track_object[0]
                query = Query.process_input(single_track, self.cog.local_folder_current_path)
                if not await self.cog.is_query_allowed(
                    self.config,
                    ctx,
                    f"{single_track.title} {single_track.author} {single_track.uri} {query}",
                    query_obj=query,
                ):
                    has_not_allowed = True
                    log.debug("Query is not allowed in %r (%s)", ctx.guild.name, ctx.guild.id)
                    continue
                track_list.append(single_track)
                if enqueue:
                    if len(player.queue) >= 10000:
                        continue
                    if guild_data["maxlength"] > 0:
                        if self.cog.is_track_length_allowed(single_track, guild_data["maxlength"]):
                            enqueued_tracks += 1
                            single_track.extras.update(
                                {
                                    "enqueue_time": int(time.time()),
                                    "vc": player.channel.id,
                                    "requester": ctx.author.id,
                                }
                            )
                            player.add(ctx.author, single_track)
                            self.bot.dispatch(
                                "red_audio_track_enqueue",
                                player.guild,
                                single_track,
                                ctx.author,
                            )
                    else:
                        enqueued_tracks += 1
                        single_track.extras.update(
                            {
                                "enqueue_time": int(time.time()),
                                "vc": player.channel.id,
                                "requester": ctx.author.id,
                            }
                        )
                        player.add(ctx.author, single_track)
                        self.bot.dispatch(
                            "red_audio_track_enqueue",
                            player.guild,
                            single_track,
                            ctx.author,
                        )

                    if not player.current:
                        await player.play()
            if enqueue and tracks_from_spotify:
                if total_tracks > enqueued_tracks:
                    maxlength_msg = _(" {bad_tracks} tracks cannot be queued.").format(
                        bad_tracks=(total_tracks - enqueued_tracks)
                    )
                else:
                    maxlength_msg = ""

                embed = discord.Embed(
                    colour=await ctx.embed_colour(),
                    title=_("Playlist Enqueued"),
                    description=_("Added {num} tracks to the queue.{maxlength_msg}").format(
                        num=enqueued_tracks, maxlength_msg=maxlength_msg
                    ),
                )
                if not guild_data["shuffle"] and queue_dur > 0:
                    embed.set_footer(
                        text=_(
                            "{time} until start of playlist"
                            " playback: starts at #{position} in queue"
                        ).format(time=queue_total_duration, position=before_queue_length + 1)
                    )

                if notifier is not None:
                    await notifier.update_embed(embed)
            lock(ctx, False)
            if not track_list and not has_not_allowed:
                raise SpotifyFetchError(
                    message=_(
                        "Nothing found.\nThe YouTube API key may be invalid "
                        "or you may be rate limited on YouTube's search service.\n"
                        "Check the YouTube API key again and follow the instructions "
                        "at `{prefix}audioset youtubeapi`."
                    )
                )
            player.maybe_shuffle()

            if spotify_cache:
                task = ("insert", ("spotify", database_entries))
                self.append_task(ctx, *task)
        except Exception as exc:
            lock(ctx, False)
            raise exc
        finally:
            lock(ctx, False)
        return track_list

    async def fetch_youtube_query(
        self,
        ctx: commands.Context,
        track_info: str,
        current_cache_level: CacheLevel = CacheLevel.all(),
    ) -> Optional[str]:
        """Call the Youtube API and returns the youtube URL that the query matched."""
        track_url = await self.youtube_api.get_call(track_info)
        if CacheLevel.set_youtube().is_subset(current_cache_level) and track_url:
            time_now = int(datetime.datetime.now(datetime.timezone.utc).timestamp())
            task = (
                "insert",
                (
                    "youtube",
                    [
                        {
                            "track_info": track_info,
                            "track_url": track_url,
                            "last_updated": time_now,
                            "last_fetched": time_now,
                        }
                    ],
                ),
            )
            self.append_task(ctx, *task)
        return track_url

    async def fetch_from_youtube_api(
        self, ctx: commands.Context, track_info: str
    ) -> Optional[str]:
        """Gets an YouTube URL from for the query."""
        current_cache_level = CacheLevel(await self.config.cache_level())
        cache_enabled = CacheLevel.set_youtube().is_subset(current_cache_level)
        val = None
        if cache_enabled:
            try:
                (val, update) = await self.local_cache_api.youtube.fetch_one({"track": track_info})
            except Exception as exc:
                log.verbose("Failed to fetch %r from YouTube table", track_info, exc_info=exc)
        if val is None:
            try:
                youtube_url = await self.fetch_youtube_query(
                    ctx, track_info, current_cache_level=current_cache_level
                )
            except YouTubeApiError:
                youtube_url = None
        else:
            if cache_enabled:
                task = ("update", ("youtube", {"track": track_info}))
                self.append_task(ctx, *task)
            youtube_url = val
        return youtube_url

    async def fetch_track(
        self,
        ctx: commands.Context,
        player: lavalink.Player,
        query: Query,
        forced: bool = False,
        lazy: bool = False,
        should_query_global: bool = True,
    ) -> Tuple[LoadResult, bool]:
        """A replacement for :code:`lavalink.Player.load_tracks`. This will try to get a valid
        cached entry first if not found or if in valid it will then call the lavalink API.

        Parameters
        ----------
        ctx: commands.Context
            The context this method is being called under.
        player : lavalink.Player
            The player who's requesting the query.
        query: audio_dataclasses.Query
            The Query object for the query in question.
        forced:bool
            Whether or not to skip cache and call API first.
        lazy:bool
            If set to True, it will not call the api if a track is not found.
        should_query_global:bool
            If the method should query the global database.

        Returns
        -------
        Tuple[lavalink.LoadResult, bool]
            Tuple with the Load result and whether or not the API was called.
        """
        current_cache_level = CacheLevel(await self.config.cache_level())
        cache_enabled = CacheLevel.set_lavalink().is_subset(current_cache_level)
        val = None
        query = Query.process_input(query, self.cog.local_folder_current_path)
        query_string = str(query)
        globaldb_toggle = self.cog.global_api_user.get("can_read")
        valid_global_entry = False
        results = None
        called_api = False
        prefer_lyrics = await self.cog.get_lyrics_status(ctx)
        if prefer_lyrics and query.is_youtube and query.is_search:
            query_string = f"{query} - lyrics"
        if cache_enabled and not forced and not query.is_local:
            try:
                (val, last_updated) = await self.local_cache_api.lavalink.fetch_one(
                    {"query": query_string}
                )
            except Exception as exc:
                log.verbose("Failed to fetch %r from Lavalink table", query_string, exc_info=exc)

            if val and isinstance(val, dict):
                log.trace("Updating Local Database with %r", query_string)
                task = ("update", ("lavalink", {"query": query_string}))
                self.append_task(ctx, *task)
            else:
                val = None

            if val and not forced and isinstance(val, dict):
                valid_global_entry = False
                called_api = False
            else:
                val = None
        if (
            globaldb_toggle
            and not val
            and should_query_global
            and not forced
            and not query.is_local
            and not query.is_spotify
        ):
            valid_global_entry = False
            with contextlib.suppress(Exception):
                global_entry = await self.global_cache_api.get_call(query=query)
                if global_entry.get("loadType") == "V2_COMPACT":
                    global_entry["loadType"] = "V2_COMPAT"
                results = LoadResult(global_entry)
                if results.load_type in [
                    LoadType.PLAYLIST_LOADED,
                    LoadType.TRACK_LOADED,
                    LoadType.SEARCH_RESULT,
                    LoadType.V2_COMPAT,
                ]:
                    valid_global_entry = True
                if valid_global_entry:
                    log.trace("Querying Global DB api for %r", query)
                    results, called_api = results, False
        if valid_global_entry:
            pass
        elif lazy is True:
            called_api = False
        elif val and not forced and isinstance(val, dict):
            data = val
            data["query"] = query_string
            if data.get("loadType") == "V2_COMPACT":
                data["loadType"] = "V2_COMPAT"
            results = LoadResult(data)
            called_api = False
            if results.has_error:
                # If cached value has an invalid entry make a new call so that it gets updated
                results, called_api = await self.fetch_track(ctx, player, query, forced=True)
            valid_global_entry = False
        else:
            log.trace("Querying Lavalink api for %r", query_string)
            called_api = True
            try:
                results = await player.load_tracks(query_string)
            except KeyError:
                results = None
            except RuntimeError:
                raise TrackEnqueueError
        if results is None:
            results = LoadResult({"loadType": "LOAD_FAILED", "playlistInfo": {}, "tracks": []})
            valid_global_entry = False
        update_global = (
            globaldb_toggle and not valid_global_entry and self.global_cache_api.has_api_key
        )
        with contextlib.suppress(Exception):
            if (
                update_global
                and not query.is_local
                and not results.has_error
                and len(results.tracks) >= 1
            ):
                global_task = ("global", dict(llresponse=results, query=query))
                self.append_task(ctx, *global_task)
        if (
            cache_enabled
            and results.load_type
            and not results.has_error
            and not query.is_local
            and results.tracks
        ):
            try:
                time_now = int(datetime.datetime.now(datetime.timezone.utc).timestamp())
                data = json.dumps(results._raw)
                if all(k in data for k in ["loadType", "playlistInfo", "isSeekable", "isStream"]):
                    task = (
                        "insert",
                        (
                            "lavalink",
                            [
                                {
                                    "query": query_string,
                                    "data": data,
                                    "last_updated": time_now,
                                    "last_fetched": time_now,
                                }
                            ],
                        ),
                    )
                    self.append_task(ctx, *task)
            except Exception as exc:
                log.verbose(
                    "Failed to enqueue write task for %r to Lavalink table",
                    query_string,
                    exc_info=exc,
                )
        return results, called_api

    async def autoplay(self, player: lavalink.Player, playlist_api: PlaylistWrapper):
        """Enqueue a random track."""
        autoplaylist = await self.config.guild(player.guild).autoplaylist()
        current_cache_level = CacheLevel(await self.config.cache_level())
        cache_enabled = CacheLevel.set_lavalink().is_subset(current_cache_level)
        notify_channel_id = player.fetch("notify_channel")
        playlist = None
        tracks = None
        if autoplaylist["enabled"]:
            try:
                playlist = await get_playlist(
                    autoplaylist["id"],
                    autoplaylist["scope"],
                    self.bot,
                    playlist_api,
                    player.guild,
                    player.guild.me,
                )
                tracks = playlist.tracks_obj
            except Exception as exc:
                log.verbose("Failed to fetch playlist for autoplay", exc_info=exc)

        if not tracks or not getattr(playlist, "tracks", None):
            if cache_enabled:
                track = await self.get_random_track_from_db()
                tracks = [] if not track else [track]
            if not tracks:
                ctx = namedtuple("Context", "message guild cog")
                (results, called_api) = await self.fetch_track(
                    cast(commands.Context, ctx(player.guild, player.guild, self.cog)),
                    player,
                    Query.process_input(_TOP_100_US, self.cog.local_folder_current_path),
                )
                tracks = list(results.tracks)
        if tracks:
            multiple = len(tracks) > 1
            valid = not multiple
            tries = len(tracks)
            track = tracks[0]
            while valid is False and multiple:
                tries -= 1
                if tries <= 0:
                    raise DatabaseError("No valid entry found")
                track = random.choice(tracks)
                query = Query.process_input(track, self.cog.local_folder_current_path)
                await asyncio.sleep(0.001)
                if (not query.valid) or (
                    query.is_local
                    and query.local_track_path is not None
                    and not query.local_track_path.exists()
                ):
                    continue
                notify_channel = player.guild.get_channel_or_thread(notify_channel_id)
                if not await self.cog.is_query_allowed(
                    self.config,
                    notify_channel,
                    f"{track.title} {track.author} {track.uri} {query}",
                    query_obj=query,
                ):
                    log.debug(
                        "Query is not allowed in %r (%s)", player.guild.name, player.guild.id
                    )
                    continue
                valid = True
            track.extras.update(
                {
                    "autoplay": True,
                    "enqueue_time": int(time.time()),
                    "vc": player.channel.id,
                    "requester": player.guild.me.id,
                }
            )
            player.add(player.guild.me, track)
            self.bot.dispatch(
                "red_audio_track_auto_play",
                player.guild,
                track,
                player.guild.me,
                player,
            )
            if notify_channel_id:
                await self.config.guild_from_id(
                    guild_id=player.guild.id
                ).currently_auto_playing_in.set([notify_channel_id, player.channel.id])
            else:
                await self.config.guild_from_id(
                    guild_id=player.guild.id
                ).currently_auto_playing_in.set([])
            if not player.current:
                await player.play()

    async def fetch_all_contribute(self) -> List[LavalinkCacheFetchForGlobalResult]:
        return await self.local_cache_api.lavalink.fetch_all_for_global()