redbot/cogs/audio/core/utilities/playlists.py
import asyncio
import contextlib
import datetime
import json
import math
import random
import time
from pathlib import Path
from typing import List, MutableMapping, Optional, Tuple, Union
import aiohttp
import discord
import lavalink
from lavalink import NodeNotFound
from red_commons.logging import getLogger
from redbot.core import commands
from redbot.core.i18n import Translator
from redbot.core.utils import AsyncIter
from redbot.core.utils.chat_formatting import box
from redbot.core.utils.menus import start_adding_reactions
from redbot.core.utils.predicates import ReactionPredicate
from ...apis.playlist_interface import Playlist, PlaylistCompat23, create_playlist
from ...audio_dataclasses import _PARTIALLY_SUPPORTED_MUSIC_EXT, Query
from ...errors import TooManyMatches, TrackEnqueueError
from ...utils import Notifier, PlaylistScope
from ..abc import MixinMeta
from ..cog_utils import CompositeMetaClass
log = getLogger("red.cogs.Audio.cog.Utilities.playlists")
_ = Translator("Audio", Path(__file__))
CURATED_DATA = (
"https://gist.githubusercontent.com/aikaterna/4b5de6c420cd6f12b83cb895ca2de16a/raw/json"
)
class PlaylistUtilities(MixinMeta, metaclass=CompositeMetaClass):
async def can_manage_playlist(
self,
scope: str,
playlist: Playlist,
ctx: commands.Context,
user,
guild,
bypass: bool = False,
) -> bool:
is_owner = await self.bot.is_owner(ctx.author)
has_perms = False
user_to_query = user
guild_to_query = guild
dj_enabled = None
playlist_author = (
guild.get_member(playlist.author)
if guild
else self.bot.get_user(playlist.author) or user
)
is_different_user = len({playlist.author, user_to_query.id, ctx.author.id}) != 1
is_different_guild = True if guild_to_query is None else ctx.guild.id != guild_to_query.id
if getattr(playlist, "id", 0) == 42069:
has_perms = bypass
elif is_owner:
has_perms = True
elif playlist.scope == PlaylistScope.USER.value:
if not is_different_user:
has_perms = True
elif playlist.scope == PlaylistScope.GUILD.value and not is_different_guild:
dj_enabled = self._dj_status_cache.setdefault(
ctx.guild.id, await self.config.guild(ctx.guild).dj_enabled()
)
if (
guild.owner_id == ctx.author.id
or (dj_enabled and await self._has_dj_role(ctx, ctx.author))
or (await self.bot.is_mod(ctx.author))
or (not dj_enabled and not is_different_user)
):
has_perms = True
if has_perms is False:
if hasattr(playlist, "name"):
msg = _(
"You do not have the permissions to manage {name} (`{id}`) [**{scope}**]."
).format(
user=playlist_author,
name=playlist.name,
id=playlist.id,
scope=self.humanize_scope(
playlist.scope,
ctx=guild_to_query
if playlist.scope == PlaylistScope.GUILD.value
else playlist_author
if playlist.scope == PlaylistScope.USER.value
else None,
),
)
elif playlist.scope == PlaylistScope.GUILD.value and (
is_different_guild or dj_enabled
):
msg = _(
"You do not have the permissions to manage that playlist in {guild}."
).format(guild=guild_to_query)
elif (
playlist.scope in [PlaylistScope.GUILD.value, PlaylistScope.USER.value]
and is_different_user
):
msg = _(
"You do not have the permissions to manage playlist owned by {user}."
).format(user=playlist_author)
else:
msg = _(
"You do not have the permissions to manage playlists in {scope} scope."
).format(scope=self.humanize_scope(scope, the=True))
await self.send_embed_msg(ctx, title=_("No access to playlist."), description=msg)
return False
return True
async def get_playlist_match(
self,
context: commands.Context,
matches: MutableMapping,
scope: str,
author: discord.User,
guild: discord.Guild,
specified_user: bool = False,
) -> Tuple[Optional[Playlist], str, str]:
"""
Parameters
----------
context: commands.Context
The context in which this is being called.
matches: dict
A dict of the matches found where key is scope and value is matches.
scope:str
The custom config scope. A value from :code:`PlaylistScope`.
author: discord.User
The user.
guild: discord.Guild
The guild.
specified_user: bool
Whether or not a user ID was specified via argparse.
Returns
-------
Tuple[Optional[Playlist], str, str]
Tuple of Playlist or None if none found, original user input and scope.
Raises
------
`TooManyMatches`
When more than 10 matches are found or
When multiple matches are found but none is selected.
"""
correct_scope_matches: List[Playlist]
original_input = matches.get("arg")
lazy_match = False
if scope is None:
correct_scope_matches_temp: MutableMapping = matches.get("all")
lazy_match = True
else:
correct_scope_matches_temp: MutableMapping = matches.get(scope)
guild_to_query = guild.id
user_to_query = author.id
correct_scope_matches_user = []
correct_scope_matches_guild = []
correct_scope_matches_global = []
if not correct_scope_matches_temp:
return None, original_input, scope or PlaylistScope.GUILD.value
if lazy_match or (scope == PlaylistScope.USER.value):
correct_scope_matches_user = [
p for p in matches.get(PlaylistScope.USER.value) if user_to_query == p.scope_id
]
if lazy_match or (scope == PlaylistScope.GUILD.value and not correct_scope_matches_user):
if specified_user:
correct_scope_matches_guild = [
p
for p in matches.get(PlaylistScope.GUILD.value)
if guild_to_query == p.scope_id and p.author == user_to_query
]
else:
correct_scope_matches_guild = [
p
for p in matches.get(PlaylistScope.GUILD.value)
if guild_to_query == p.scope_id
]
if lazy_match or (
scope == PlaylistScope.GLOBAL.value
and not correct_scope_matches_user
and not correct_scope_matches_guild
):
if specified_user:
correct_scope_matches_global = [
p for p in matches.get(PlaylistScope.GLOBAL.value) if p.author == user_to_query
]
else:
correct_scope_matches_global = [p for p in matches.get(PlaylistScope.GLOBAL.value)]
correct_scope_matches = [
*correct_scope_matches_global,
*correct_scope_matches_guild,
*correct_scope_matches_user,
]
match_count = len(correct_scope_matches)
if match_count > 1:
correct_scope_matches2 = [
p for p in correct_scope_matches if p.name == str(original_input).strip()
]
if correct_scope_matches2:
correct_scope_matches = correct_scope_matches2
elif original_input.isnumeric():
arg = int(original_input)
correct_scope_matches3 = [p for p in correct_scope_matches if p.id == arg]
if correct_scope_matches3:
correct_scope_matches = correct_scope_matches3
match_count = len(correct_scope_matches)
# We done all the trimming we can with the info available time to ask the user
if match_count > 10:
if original_input.isnumeric():
arg = int(original_input)
correct_scope_matches = [p for p in correct_scope_matches if p.id == arg]
if match_count > 10:
raise TooManyMatches(
_(
"{match_count} playlists match {original_input}: "
"Please try to be more specific, or use the playlist ID."
).format(match_count=match_count, original_input=original_input)
)
elif match_count == 1:
return correct_scope_matches[0], original_input, correct_scope_matches[0].scope
elif match_count == 0:
return None, original_input, scope or PlaylistScope.GUILD.value
# TODO : Convert this section to a new paged reaction menu when Toby Menus are Merged
pos_len = 3
playlists = f"{'#':{pos_len}}\n"
number = 0
correct_scope_matches = sorted(correct_scope_matches, key=lambda x: x.name.lower())
async for number, playlist in AsyncIter(correct_scope_matches).enumerate(start=1):
author = self.bot.get_user(playlist.author) or playlist.author or _("Unknown")
line = _(
"{number}."
" <{playlist.name}>\n"
" - Scope: < {scope} >\n"
" - ID: < {playlist.id} >\n"
" - Tracks: < {tracks} >\n"
" - Author: < {author} >\n\n"
).format(
number=number,
playlist=playlist,
scope=self.humanize_scope(playlist.scope),
tracks=len(playlist.tracks),
author=author,
)
playlists += line
embed = discord.Embed(
title=_("{playlists} playlists found, which one would you like?").format(
playlists=number
),
description=box(playlists, lang="md"),
colour=await context.embed_colour(),
)
msg = await context.send(embed=embed)
available_emojis = list(ReactionPredicate.NUMBER_EMOJIS[1:])
available_emojis.append("🔟")
emojis = available_emojis[: len(correct_scope_matches)]
emojis.append("\N{CROSS MARK}")
start_adding_reactions(msg, emojis)
pred = ReactionPredicate.with_emojis(emojis, msg, user=context.author)
try:
await context.bot.wait_for("reaction_add", check=pred, timeout=60)
except asyncio.TimeoutError:
with contextlib.suppress(discord.HTTPException):
await msg.delete()
raise TooManyMatches(
_("Too many matches found and you did not select which one you wanted.")
)
if emojis[pred.result] == "\N{CROSS MARK}":
with contextlib.suppress(discord.HTTPException):
await msg.delete()
raise TooManyMatches(
_("Too many matches found and you did not select which one you wanted.")
)
with contextlib.suppress(discord.HTTPException):
await msg.delete()
return (
correct_scope_matches[pred.result],
original_input,
correct_scope_matches[pred.result].scope,
)
async def _build_playlist_list_page(
self, ctx: commands.Context, page_num: int, abc_names: List, scope: Optional[str]
) -> discord.Embed:
plist_num_pages = math.ceil(len(abc_names) / 5)
plist_idx_start = (page_num - 1) * 5
plist_idx_end = plist_idx_start + 5
plist = ""
async for i, playlist_info in AsyncIter(
abc_names[plist_idx_start:plist_idx_end]
).enumerate(start=plist_idx_start):
item_idx = i + 1
plist += "`{}.` {}".format(item_idx, playlist_info)
if scope is None:
embed = discord.Embed(
colour=await ctx.embed_colour(),
title=_("Playlists you can access in this server:"),
description=plist,
)
else:
embed = discord.Embed(
colour=await ctx.embed_colour(),
title=_("Playlists for {scope}:").format(scope=scope),
description=plist,
)
embed.set_footer(
text=_("Page {page_num}/{total_pages} | {num} playlists.").format(
page_num=page_num, total_pages=plist_num_pages, num=len(abc_names)
)
)
return embed
async def _load_v3_playlist(
self,
ctx: commands.Context,
scope: str,
uploaded_playlist_name: str,
uploaded_playlist_url: str,
track_list: List,
author: Union[discord.User, discord.Member],
guild: Union[discord.Guild],
) -> None:
embed1 = discord.Embed(title=_("Please wait, adding tracks..."))
playlist_msg = await self.send_embed_msg(ctx, embed=embed1)
track_count = len(track_list)
uploaded_track_count = len(track_list)
await asyncio.sleep(1)
embed2 = discord.Embed(
colour=await ctx.embed_colour(),
title=_("Loading track {num}/{total}...").format(
num=track_count, total=uploaded_track_count
),
)
await playlist_msg.edit(embed=embed2)
playlist = await create_playlist(
ctx,
self.playlist_api,
scope,
uploaded_playlist_name,
uploaded_playlist_url,
track_list,
author,
guild,
)
scope_name = self.humanize_scope(
scope, ctx=guild if scope == PlaylistScope.GUILD.value else author
)
if not track_count:
msg = _("Empty playlist {name} (`{id}`) [**{scope}**] created.").format(
name=playlist.name, id=playlist.id, scope=scope_name
)
elif uploaded_track_count != track_count:
bad_tracks = uploaded_track_count - track_count
msg = _(
"Added {num} tracks from the {playlist_name} playlist. {num_bad} track(s) "
"could not be loaded."
).format(num=track_count, playlist_name=playlist.name, num_bad=bad_tracks)
else:
msg = _("Added {num} tracks from the {playlist_name} playlist.").format(
num=track_count, playlist_name=playlist.name
)
embed3 = discord.Embed(
colour=await ctx.embed_colour(), title=_("Playlist Saved"), description=msg
)
await playlist_msg.edit(embed=embed3)
database_entries = []
time_now = int(datetime.datetime.now(datetime.timezone.utc).timestamp())
async for t in AsyncIter(track_list):
uri = t.get("info", {}).get("uri")
if uri:
t = {"loadType": "V2_COMPAT", "tracks": [t], "query": uri}
data = json.dumps(t)
if all(k in data for k in ["loadType", "playlistInfo", "isSeekable", "isStream"]):
database_entries.append(
{
"query": uri,
"data": data,
"last_updated": time_now,
"last_fetched": time_now,
}
)
if database_entries:
await self.api_interface.local_cache_api.lavalink.insert(database_entries)
async def _load_v2_playlist(
self,
ctx: commands.Context,
uploaded_track_list,
player: lavalink.player.Player,
playlist_url: str,
uploaded_playlist_name: str,
scope: str,
author: Union[discord.User, discord.Member],
guild: Union[discord.Guild],
):
track_list = []
successful_count = 0
uploaded_track_count = len(uploaded_track_list)
embed1 = discord.Embed(title=_("Please wait, adding tracks..."))
playlist_msg = await self.send_embed_msg(ctx, embed=embed1)
notifier = Notifier(ctx, playlist_msg, {"playlist": _("Loading track {num}/{total}...")})
async for track_count, song_url in AsyncIter(uploaded_track_list).enumerate(start=1):
try:
try:
result, called_api = await self.api_interface.fetch_track(
ctx, player, Query.process_input(song_url, self.local_folder_current_path)
)
except TrackEnqueueError:
self.update_player_lock(ctx, False)
return await self.send_embed_msg(
ctx,
title=_("Unable to Get Track"),
description=_(
"I'm unable to get a track from the Lavalink node at the moment, "
"try again in a few minutes."
),
)
except Exception as e:
self.update_player_lock(ctx, False)
raise e
track = result.tracks[0]
except Exception as exc:
log.verbose("Failed to get track for %r", song_url, exc_info=exc)
continue
try:
track_obj = self.get_track_json(player, other_track=track)
track_list.append(track_obj)
successful_count += 1
except Exception as exc:
log.verbose("Failed to create track for %r", track, exc_info=exc)
continue
if (track_count % 2 == 0) or (track_count == len(uploaded_track_list)):
await notifier.notify_user(
current=track_count, total=len(uploaded_track_list), key="playlist"
)
playlist = await create_playlist(
ctx,
self.playlist_api,
scope,
uploaded_playlist_name,
playlist_url,
track_list,
author,
guild,
)
scope_name = self.humanize_scope(
scope, ctx=guild if scope == PlaylistScope.GUILD.value else author
)
if not successful_count:
msg = _("Empty playlist {name} (`{id}`) [**{scope}**] created.").format(
name=playlist.name, id=playlist.id, scope=scope_name
)
elif uploaded_track_count != successful_count:
bad_tracks = uploaded_track_count - successful_count
msg = _(
"Added {num} tracks from the {playlist_name} playlist. {num_bad} track(s) "
"could not be loaded."
).format(num=successful_count, playlist_name=playlist.name, num_bad=bad_tracks)
else:
msg = _("Added {num} tracks from the {playlist_name} playlist.").format(
num=successful_count, playlist_name=playlist.name
)
embed3 = discord.Embed(
colour=await ctx.embed_colour(), title=_("Playlist Saved"), description=msg
)
await playlist_msg.edit(embed=embed3)
async def _maybe_update_playlist(
self, ctx: commands.Context, player: lavalink.player.Player, playlist: Playlist
) -> Tuple[List[lavalink.Track], List[lavalink.Track], Playlist]:
if getattr(playlist, "id", 0) == 42069:
_, updated_tracks = await self._get_bundled_playlist_tracks()
results = {}
old_tracks = playlist.tracks_obj
new_tracks = [lavalink.Track(data=track) for track in updated_tracks]
removed = list(set(old_tracks) - set(new_tracks))
added = list(set(new_tracks) - set(old_tracks))
if removed or added:
await playlist.edit(results)
return added, removed, playlist
if playlist.url is None:
return [], [], playlist
results = {}
updated_tracks = await self.fetch_playlist_tracks(
ctx,
player,
Query.process_input(playlist.url, self.local_folder_current_path),
skip_cache=True,
)
if isinstance(updated_tracks, discord.Message):
return [], [], playlist
if not updated_tracks:
# No Tracks available on url Lets set it to none to avoid repeated calls here
results["url"] = None
if updated_tracks: # Tracks have been updated
results["tracks"] = updated_tracks
old_tracks = playlist.tracks_obj
new_tracks = [lavalink.Track(data=track) for track in updated_tracks]
removed = list(set(old_tracks) - set(new_tracks))
added = list(set(new_tracks) - set(old_tracks))
if removed or added:
await playlist.edit(results)
return added, removed, playlist
async def _playlist_check(self, ctx: commands.Context) -> bool:
if not self._player_check(ctx):
if self.lavalink_connection_aborted:
msg = _("Connection to Lavalink node has failed")
desc = None
if await self.bot.is_owner(ctx.author):
desc = _("Please check your console or logs for details.")
await self.send_embed_msg(ctx, title=msg, description=desc)
return False
try:
if (
not self.can_join_and_speak(ctx.author.voice.channel)
or not ctx.author.voice.channel.permissions_for(ctx.me).move_members
and self.is_vc_full(ctx.author.voice.channel)
):
await self.send_embed_msg(
ctx,
title=_("Unable To Get Playlists"),
description=_(
"I don't have permission to connect and speak in your channel."
),
)
return False
await lavalink.connect(
ctx.author.voice.channel,
self_deaf=await self.config.guild_from_id(ctx.guild.id).auto_deafen(),
)
except NodeNotFound:
await self.send_embed_msg(
ctx,
title=_("Unable To Get Playlists"),
description=_("Connection to Lavalink node has not yet been established."),
)
return False
except AttributeError:
await self.send_embed_msg(
ctx,
title=_("Unable To Get Playlists"),
description=_("Connect to a voice channel first."),
)
return False
player = lavalink.get_player(ctx.guild.id)
player.store("notify_channel", ctx.channel.id)
if (
not ctx.author.voice or ctx.author.voice.channel != player.channel
) and not await self._can_instaskip(ctx, ctx.author):
await self.send_embed_msg(
ctx,
title=_("Unable To Get Playlists"),
description=_("You must be in the voice channel to use the playlist command."),
)
return False
await self._eq_check(ctx, player)
await self.set_player_settings(ctx)
return True
async def fetch_playlist_tracks(
self,
ctx: commands.Context,
player: lavalink.player.Player,
query: Query,
skip_cache: bool = False,
) -> Union[discord.Message, None, List[MutableMapping]]:
search = query.is_search
tracklist = []
if query.is_spotify:
try:
if self.play_lock[ctx.guild.id]:
return await self.send_embed_msg(
ctx,
title=_("Unable To Get Tracks"),
description=_("Wait until the playlist has finished loading."),
)
except KeyError:
pass
tracks = await self._get_spotify_tracks(ctx, query, forced=skip_cache)
if isinstance(tracks, discord.Message):
return None
if not tracks:
embed = discord.Embed(title=_("Nothing found."))
if query.is_local and query.suffix in _PARTIALLY_SUPPORTED_MUSIC_EXT:
embed = discord.Embed(title=_("Track is not playable."))
embed.description = _(
"**{suffix}** is not a fully supported format and some "
"tracks may not play."
).format(suffix=query.suffix)
return await self.send_embed_msg(ctx, embed=embed)
async for track in AsyncIter(tracks):
track_obj = self.get_track_json(player, other_track=track)
tracklist.append(track_obj)
self.update_player_lock(ctx, False)
elif query.is_search:
try:
result, called_api = await self.api_interface.fetch_track(
ctx, player, query, forced=skip_cache
)
except TrackEnqueueError:
self.update_player_lock(ctx, False)
return await self.send_embed_msg(
ctx,
title=_("Unable to Get Track"),
description=_(
"I'm unable to get a track from Lavalink node at the moment, try again in a few "
"minutes."
),
)
except Exception as e:
self.update_player_lock(ctx, False)
raise e
tracks = result.tracks
if not tracks:
embed = discord.Embed(title=_("Nothing found."))
if query.is_local and query.suffix in _PARTIALLY_SUPPORTED_MUSIC_EXT:
embed = discord.Embed(title=_("Track is not playable."))
embed.description = _(
"**{suffix}** is not a fully supported format and some "
"tracks may not play."
).format(suffix=query.suffix)
return await self.send_embed_msg(ctx, embed=embed)
else:
try:
result, called_api = await self.api_interface.fetch_track(
ctx, player, query, forced=skip_cache
)
except TrackEnqueueError:
self.update_player_lock(ctx, False)
return await self.send_embed_msg(
ctx,
title=_("Unable to Get Track"),
description=_(
"I'm unable to get a track from Lavalink node at the moment, try again in a few "
"minutes."
),
)
except Exception as e:
self.update_player_lock(ctx, False)
raise e
tracks = result.tracks
if not search and len(tracklist) == 0:
async for track in AsyncIter(tracks):
track_obj = self.get_track_json(player, other_track=track)
tracklist.append(track_obj)
elif len(tracklist) == 0:
track_obj = self.get_track_json(player, other_track=tracks[0])
tracklist.append(track_obj)
return tracklist
def humanize_scope(
self, scope: str, ctx: Union[discord.Guild, discord.abc.User, str] = None, the: bool = None
) -> Optional[str]:
if scope == PlaylistScope.GLOBAL.value:
return _("the Global") if the else _("Global")
elif scope == PlaylistScope.GUILD.value:
return ctx.name if ctx else _("the Server") if the else _("Server")
elif scope == PlaylistScope.USER.value:
return str(ctx) if ctx else _("the User") if the else _("User")
async def _get_bundled_playlist_tracks(self):
async with aiohttp.ClientSession(json_serialize=json.dumps) as session:
async with session.get(
CURATED_DATA + f"?timestamp={int(time.time())}",
headers={"content-type": "application/json"},
) as response:
if response.status != 200:
return 0, []
try:
data = json.loads(await response.read())
except Exception as exc:
log.error(
"Curated playlist couldn't be parsed, report this error.", exc_info=exc
)
data = {}
web_version = data.get("version", 0)
entries = data.get("entries", [])
if entries:
random.shuffle(entries)
tracks = []
async for entry in AsyncIter(entries, steps=25):
with contextlib.suppress(Exception):
tracks.append(self.decode_track(entry))
return web_version, tracks
async def _build_bundled_playlist(self, forced=False):
current_version = await self.config.bundled_playlist_version()
web_version, tracks = await self._get_bundled_playlist_tracks()
if not forced and current_version >= web_version:
return
playlist_data = dict()
playlist_data["name"] = "Aikaterna's curated tracks"
playlist_data["tracks"] = tracks
playlist = await PlaylistCompat23.from_json(
bot=self.bot,
playlist_api=self.playlist_api,
scope=PlaylistScope.GLOBAL.value,
playlist_number=42069,
data=playlist_data,
guild=None,
author=self.bot.user.id,
)
await playlist.save()
await self.config.bundled_playlist_version.set(web_version)
log.info("Curated playlist has been updated.")