redbot/cogs/streams/streams.py
import discord
from redbot.core.utils.chat_formatting import humanize_list
from redbot.core.bot import Red
from redbot.core import commands, Config
from redbot.core.i18n import cog_i18n, Translator, set_contextual_locales_from_guild
from redbot.core.utils._internal_utils import send_to_owners_with_prefix_replaced
from redbot.core.utils.chat_formatting import escape, inline, pagify
from .streamtypes import (
PicartoStream,
Stream,
TwitchStream,
YoutubeStream,
)
from .errors import (
APIError,
InvalidTwitchCredentials,
InvalidYoutubeCredentials,
OfflineStream,
StreamNotFound,
StreamsError,
YoutubeQuotaExceeded,
)
from . import streamtypes as _streamtypes
import re
import logging
import asyncio
import aiohttp
import contextlib
from datetime import datetime
from collections import defaultdict
from typing import Optional, List, Tuple, Union, Dict
MAX_RETRY_COUNT = 10
_ = Translator("Streams", __file__)
log = logging.getLogger("red.core.cogs.Streams")
@cog_i18n(_)
class Streams(commands.Cog):
"""Various commands relating to streaming platforms.
You can check if a Twitch, YouTube or Picarto stream is
currently live.
"""
global_defaults = {
"refresh_timer": 300,
"tokens": {},
"streams": [],
"notified_owner_missing_twitch_secret": False,
}
guild_defaults = {
"autodelete": False,
"mention_everyone": False,
"mention_here": False,
"live_message_mention": False,
"live_message_nomention": False,
"ignore_reruns": False,
"ignore_schedule": False,
"use_buttons": False,
}
role_defaults = {"mention": False}
def __init__(self, bot: Red):
super().__init__()
self.config: Config = Config.get_conf(self, 26262626)
self.ttv_bearer_cache: dict = {}
self.config.register_global(**self.global_defaults)
self.config.register_guild(**self.guild_defaults)
self.config.register_role(**self.role_defaults)
self.bot: Red = bot
self.streams: List[Stream] = []
self.task: Optional[asyncio.Task] = None
self.yt_cid_pattern = re.compile("^UC[-_A-Za-z0-9]{21}[AQgw]$")
async def red_delete_data_for_user(self, **kwargs):
"""Nothing to delete"""
return
def check_name_or_id(self, data: str) -> bool:
matched = self.yt_cid_pattern.fullmatch(data)
if matched is None:
return True
return False
async def cog_load(self) -> None:
"""Should be called straight after cog instantiation."""
try:
await self.move_api_keys()
await self.get_twitch_bearer_token()
self.streams = await self.load_streams()
self.task = asyncio.create_task(self._stream_alerts())
except Exception as error:
log.exception("Failed to initialize Streams cog:", exc_info=error)
@commands.Cog.listener()
async def on_red_api_tokens_update(self, service_name, api_tokens):
if service_name == "twitch":
await self.get_twitch_bearer_token(api_tokens)
async def move_api_keys(self) -> None:
"""Move the API keys from cog stored config to core bot config if they exist."""
tokens = await self.config.tokens()
youtube = await self.bot.get_shared_api_tokens("youtube")
twitch = await self.bot.get_shared_api_tokens("twitch")
for token_type, token in tokens.items():
if token_type == "YoutubeStream" and "api_key" not in youtube:
await self.bot.set_shared_api_tokens("youtube", api_key=token)
if token_type == "TwitchStream" and "client_id" not in twitch:
# Don't need to check Community since they're set the same
await self.bot.set_shared_api_tokens("twitch", client_id=token)
await self.config.tokens.clear()
async def _notify_owner_about_missing_twitch_secret(self) -> None:
message = _(
"You need a client secret key if you want to use the Twitch API on this cog.\n"
"Follow these steps:\n"
"1. Go to this page: {link}.\n"
'2. Click "Manage" on your application.\n'
'3. Click on "New secret".\n'
"5. Copy your client ID and your client secret into:\n"
"{command}"
"\n\n"
"Note: These tokens are sensitive and should only be used in a private channel "
"or in DM with the bot."
).format(
link="https://dev.twitch.tv/console/apps",
command=inline(
"[p]set api twitch client_id {} client_secret {}".format(
_("<your_client_id_here>"), _("<your_client_secret_here>")
)
),
)
await send_to_owners_with_prefix_replaced(self.bot, message)
await self.config.notified_owner_missing_twitch_secret.set(True)
async def get_twitch_bearer_token(self, api_tokens: Optional[Dict] = None) -> None:
tokens = (
await self.bot.get_shared_api_tokens("twitch") if api_tokens is None else api_tokens
)
if tokens.get("client_id"):
notified_owner_missing_twitch_secret = (
await self.config.notified_owner_missing_twitch_secret()
)
try:
tokens["client_secret"]
if notified_owner_missing_twitch_secret is True:
await self.config.notified_owner_missing_twitch_secret.set(False)
except KeyError:
if notified_owner_missing_twitch_secret is False:
asyncio.create_task(self._notify_owner_about_missing_twitch_secret())
async with aiohttp.ClientSession() as session:
async with session.post(
"https://id.twitch.tv/oauth2/token",
params={
"client_id": tokens.get("client_id", ""),
"client_secret": tokens.get("client_secret", ""),
"grant_type": "client_credentials",
},
) as req:
try:
data = await req.json()
except aiohttp.ContentTypeError:
data = {}
if req.status == 200:
pass
elif req.status == 400 and data.get("message") == "invalid client":
log.error(
"Twitch API request failed authentication: set Client ID is invalid."
)
elif req.status == 403 and data.get("message") == "invalid client secret":
log.error(
"Twitch API request failed authentication: set Client Secret is invalid."
)
elif "message" in data:
log.error(
"Twitch OAuth2 API request failed with status code %s"
" and error message: %s",
req.status,
data["message"],
)
else:
log.error("Twitch OAuth2 API request failed with status code %s", req.status)
if req.status != 200:
return
self.ttv_bearer_cache = data
self.ttv_bearer_cache["expires_at"] = datetime.now().timestamp() + data.get("expires_in")
async def maybe_renew_twitch_bearer_token(self) -> None:
if self.ttv_bearer_cache:
if self.ttv_bearer_cache["expires_at"] - datetime.now().timestamp() <= 60:
await self.get_twitch_bearer_token()
@commands.guild_only()
@commands.command()
async def twitchstream(self, ctx: commands.Context, channel_name: str):
"""Check if a Twitch channel is live."""
await self.maybe_renew_twitch_bearer_token()
token = (await self.bot.get_shared_api_tokens("twitch")).get("client_id")
stream = TwitchStream(
_bot=self.bot,
name=channel_name,
token=token,
bearer=self.ttv_bearer_cache.get("access_token", None),
)
await self.check_online(ctx, stream)
@commands.guild_only()
@commands.command()
@commands.cooldown(1, 30, commands.BucketType.guild)
async def youtubestream(self, ctx: commands.Context, channel_id_or_name: str):
"""Check if a YouTube channel is live."""
# TODO: Write up a custom check to look up cooldown set by botowner
# This check is here to avoid people spamming this command and eating up quota
apikey = await self.bot.get_shared_api_tokens("youtube")
is_name = self.check_name_or_id(channel_id_or_name)
if is_name:
stream = YoutubeStream(
_bot=self.bot, name=channel_id_or_name, token=apikey, config=self.config
)
else:
stream = YoutubeStream(
_bot=self.bot, id=channel_id_or_name, token=apikey, config=self.config
)
await self.check_online(ctx, stream)
@commands.guild_only()
@commands.command()
async def picarto(self, ctx: commands.Context, channel_name: str):
"""Check if a Picarto channel is live."""
stream = PicartoStream(_bot=self.bot, name=channel_name)
await self.check_online(ctx, stream)
async def check_online(
self,
ctx: commands.Context,
stream: Union[PicartoStream, YoutubeStream, TwitchStream],
):
try:
info = await stream.is_online()
except OfflineStream:
await ctx.send(_("That user is offline."))
except StreamNotFound:
await ctx.send(_("That user doesn't seem to exist."))
except InvalidTwitchCredentials:
await ctx.send(
_("The Twitch token is either invalid or has not been set. See {command}.").format(
command=inline(f"{ctx.clean_prefix}streamset twitchtoken")
)
)
except InvalidYoutubeCredentials:
await ctx.send(
_(
"The YouTube API key is either invalid or has not been set. See {command}."
).format(command=inline(f"{ctx.clean_prefix}streamset youtubekey"))
)
except YoutubeQuotaExceeded:
await ctx.send(
_(
"YouTube quota has been exceeded."
" Try again later or contact the owner if this continues."
)
)
except APIError as e:
log.error(
"Something went wrong whilst trying to contact the stream service's API.\n"
"Raw response data:\n%r",
e,
)
await ctx.send(
_("Something went wrong whilst trying to contact the stream service's API.")
)
else:
if isinstance(info, tuple):
embed, is_rerun = info
ignore_reruns = await self.config.guild(ctx.channel.guild).ignore_reruns()
if ignore_reruns and is_rerun:
await ctx.send(_("That user is offline."))
return
else:
embed = info
use_buttons: bool = await self.config.guild(ctx.channel.guild).use_buttons()
view = None
if use_buttons:
stream_url = embed.url
view = discord.ui.View()
view.add_item(
discord.ui.Button(
label=_("Watch the stream"), style=discord.ButtonStyle.link, url=stream_url
)
)
await ctx.send(embed=embed, view=view)
@commands.group()
@commands.guild_only()
@commands.mod_or_permissions(manage_channels=True)
async def streamalert(self, ctx: commands.Context):
"""Manage automated stream alerts."""
pass
@streamalert.group(name="twitch", invoke_without_command=True)
async def _twitch(
self,
ctx: commands.Context,
channel_name: str,
discord_channel: Union[
discord.TextChannel, discord.VoiceChannel, discord.StageChannel
] = commands.CurrentChannel,
):
"""Manage Twitch stream notifications."""
await ctx.invoke(self.twitch_alert_channel, channel_name, discord_channel)
@_twitch.command(name="channel")
async def twitch_alert_channel(
self,
ctx: commands.Context,
channel_name: str,
discord_channel: Union[
discord.TextChannel, discord.VoiceChannel, discord.StageChannel
] = commands.CurrentChannel,
):
"""Toggle alerts in this or the given channel for a Twitch stream."""
if re.fullmatch(r"<#\d+>", channel_name):
await ctx.send(
_("Please supply the name of a *Twitch* channel, not a Discord channel.")
)
return
await self.stream_alert(ctx, TwitchStream, channel_name.lower(), discord_channel)
@streamalert.command(name="youtube")
async def youtube_alert(
self,
ctx: commands.Context,
channel_name_or_id: str,
discord_channel: Union[
discord.TextChannel, discord.VoiceChannel, discord.StageChannel
] = commands.CurrentChannel,
):
"""Toggle alerts in this channel for a YouTube stream."""
await self.stream_alert(ctx, YoutubeStream, channel_name_or_id, discord_channel)
@streamalert.command(name="picarto")
async def picarto_alert(
self,
ctx: commands.Context,
channel_name: str,
discord_channel: Union[
discord.TextChannel, discord.VoiceChannel, discord.StageChannel
] = commands.CurrentChannel,
):
"""Toggle alerts in this channel for a Picarto stream."""
await self.stream_alert(ctx, PicartoStream, channel_name, discord_channel)
@streamalert.command(name="stop", usage="[disable_all=No]")
async def streamalert_stop(self, ctx: commands.Context, _all: bool = False):
"""Disable all stream alerts in this channel or server.
`[p]streamalert stop` will disable this channel's stream
alerts.
Do `[p]streamalert stop yes` to disable all stream alerts in
this server.
"""
streams = self.streams.copy()
local_channel_ids = [c.id for c in ctx.guild.channels]
to_remove = []
for stream in streams:
for channel_id in stream.channels:
if channel_id == ctx.channel.id:
stream.channels.remove(channel_id)
elif _all and ctx.channel.id in local_channel_ids:
if channel_id in stream.channels:
stream.channels.remove(channel_id)
if not stream.channels:
to_remove.append(stream)
for stream in to_remove:
streams.remove(stream)
self.streams = streams
await self.save_streams()
if _all:
msg = _("All the stream alerts in this server have been disabled.")
else:
msg = _("All the stream alerts in this channel have been disabled.")
await ctx.send(msg)
@streamalert.command(name="list")
async def streamalert_list(self, ctx: commands.Context):
"""List all active stream alerts in this server."""
streams_list = defaultdict(lambda: defaultdict(list))
guild_channels_ids = [c.id for c in ctx.guild.channels]
msg = _("Active alerts:\n\n")
for stream in self.streams:
for channel_id in stream.channels:
if channel_id in guild_channels_ids:
streams_list[channel_id][stream.platform_name].append(stream.name.lower())
if not streams_list:
await ctx.send(_("There are no active alerts in this server."))
return
for channel_id, stream_platform in streams_list.items():
msg += f"- {ctx.guild.get_channel(channel_id).mention}\n"
for platform, streams in stream_platform.items():
msg += f" - **{platform}**\n"
msg += f" {humanize_list(streams)}\n"
for page in pagify(msg):
await ctx.send(page)
async def stream_alert(self, ctx: commands.Context, _class, channel_name, discord_channel):
if isinstance(discord_channel, discord.Thread):
await ctx.send("Stream alerts cannot be set up in threads.")
return
stream = self.get_stream(_class, channel_name)
if not stream:
token = await self.bot.get_shared_api_tokens(_class.token_name)
is_yt = _class.__name__ == "YoutubeStream"
is_twitch = _class.__name__ == "TwitchStream"
if is_yt and not self.check_name_or_id(channel_name):
stream = _class(_bot=self.bot, id=channel_name, token=token, config=self.config)
elif is_twitch:
await self.maybe_renew_twitch_bearer_token()
stream = _class(
_bot=self.bot,
name=channel_name,
token=token.get("client_id"),
bearer=self.ttv_bearer_cache.get("access_token", None),
)
else:
if is_yt:
stream = _class(
_bot=self.bot, name=channel_name, token=token, config=self.config
)
else:
stream = _class(_bot=self.bot, name=channel_name, token=token)
try:
exists = await self.check_exists(stream)
except InvalidTwitchCredentials:
await ctx.send(
_(
"The Twitch token is either invalid or has not been set. See {command}."
).format(command=inline(f"{ctx.clean_prefix}streamset twitchtoken"))
)
return
except InvalidYoutubeCredentials:
await ctx.send(
_(
"The YouTube API key is either invalid or has not been set. See "
"{command}."
).format(command=inline(f"{ctx.clean_prefix}streamset youtubekey"))
)
return
except YoutubeQuotaExceeded:
await ctx.send(
_(
"YouTube quota has been exceeded."
" Try again later or contact the owner if this continues."
)
)
except APIError as e:
log.error(
"Something went wrong whilst trying to contact the stream service's API.\n"
"Raw response data:\n%r",
e,
)
await ctx.send(
_("Something went wrong whilst trying to contact the stream service's API.")
)
return
else:
if not exists:
await ctx.send(_("That user doesn't seem to exist."))
return
await self.add_or_remove(ctx, stream, discord_channel)
@commands.group()
@commands.mod_or_permissions(manage_channels=True)
async def streamset(self, ctx: commands.Context):
"""Manage stream alert settings."""
pass
@streamset.command(name="timer")
@commands.is_owner()
async def _streamset_refresh_timer(self, ctx: commands.Context, refresh_time: int):
"""Set stream check refresh time."""
if refresh_time < 60:
return await ctx.send(_("You cannot set the refresh timer to less than 60 seconds"))
await self.config.refresh_timer.set(refresh_time)
await ctx.send(
_("Refresh timer set to {refresh_time} seconds".format(refresh_time=refresh_time))
)
@streamset.command()
@commands.is_owner()
async def twitchtoken(self, ctx: commands.Context):
"""Explain how to set the twitch token."""
message = _(
"To set the twitch API tokens, follow these steps:\n"
"1. Go to this page: {link}.\n"
"2. Click *Register Your Application*.\n"
"3. Enter a name, set the OAuth Redirect URI to {localhost}, and "
"select an Application Category of your choosing.\n"
"4. Click *Register*.\n"
"5. Copy your client ID and your client secret into:\n"
"{command}"
"\n\n"
"Note: These tokens are sensitive and should only be used in a private channel\n"
"or in DM with the bot.\n"
).format(
link="https://dev.twitch.tv/dashboard/apps",
localhost=inline("http://localhost"),
command="`{}set api twitch client_id {} client_secret {}`".format(
ctx.clean_prefix, _("<your_client_id_here>"), _("<your_client_secret_here>")
),
)
await ctx.maybe_send_embed(message)
@streamset.command()
@commands.is_owner()
async def youtubekey(self, ctx: commands.Context):
"""Explain how to set the YouTube token."""
message = _(
"To get one, do the following:\n"
"1. Create a project\n"
"(see {link1} for details)\n"
"2. Enable the YouTube Data API v3 \n"
"(see {link2} for instructions)\n"
"3. Set up your API key \n"
"(see {link3} for instructions)\n"
"4. Copy your API key and run the command "
"{command}\n\n"
"Note: These tokens are sensitive and should only be used in a private channel\n"
"or in DM with the bot.\n"
).format(
link1="https://support.google.com/googleapi/answer/6251787",
link2="https://support.google.com/googleapi/answer/6158841",
link3="https://support.google.com/googleapi/answer/6158862",
command="`{}set api youtube api_key {}`".format(
ctx.clean_prefix, _("<your_api_key_here>")
),
)
await ctx.maybe_send_embed(message)
@streamset.group()
@commands.guild_only()
async def message(self, ctx: commands.Context):
"""Manage custom messages for stream alerts."""
pass
@message.command(name="mention")
@commands.guild_only()
async def with_mention(self, ctx: commands.Context, *, message: str):
"""Set stream alert message when mentions are enabled.
Use `{mention}` in the message to insert the selected mentions.
Use `{stream}` in the message to insert the channel or username.
Use `{stream.display_name}` in the message to insert the channel's display name (on Twitch, this may be different from `{stream}`).
For example: `[p]streamset message mention {mention}, {stream.display_name} is live!`
"""
guild = ctx.guild
await self.config.guild(guild).live_message_mention.set(message)
await ctx.send(_("Stream alert message set!"))
@message.command(name="nomention")
@commands.guild_only()
async def without_mention(self, ctx: commands.Context, *, message: str):
"""Set stream alert message when mentions are disabled.
Use `{stream}` in the message to insert the channel or username.
Use `{stream.display_name}` in the message to insert the channel's display name (on Twitch, this may be different from `{stream}`).
For example: `[p]streamset message nomention {stream.display_name} is live!`
"""
guild = ctx.guild
await self.config.guild(guild).live_message_nomention.set(message)
await ctx.send(_("Stream alert message set!"))
@message.command(name="clear")
@commands.guild_only()
async def clear_message(self, ctx: commands.Context):
"""Reset the stream alert messages in this server."""
guild = ctx.guild
await self.config.guild(guild).live_message_mention.set(False)
await self.config.guild(guild).live_message_nomention.set(False)
await ctx.send(_("Stream alerts in this server will now use the default alert message."))
@streamset.group()
@commands.guild_only()
async def mention(self, ctx: commands.Context):
"""Manage mention settings for stream alerts."""
pass
@mention.command(aliases=["everyone"])
@commands.guild_only()
async def all(self, ctx: commands.Context):
"""Toggle the `@\u200beveryone` mention."""
guild = ctx.guild
current_setting = await self.config.guild(guild).mention_everyone()
if current_setting:
await self.config.guild(guild).mention_everyone.set(False)
await ctx.send(
_("{everyone} will no longer be mentioned for stream alerts.").format(
everyone=inline("@\u200beveryone")
)
)
else:
await self.config.guild(guild).mention_everyone.set(True)
await ctx.send(
_("When a stream is live, {everyone} will be mentioned.").format(
everyone=inline("@\u200beveryone")
)
)
@mention.command(aliases=["here"])
@commands.guild_only()
async def online(self, ctx: commands.Context):
"""Toggle the `@\u200bhere` mention."""
guild = ctx.guild
current_setting = await self.config.guild(guild).mention_here()
if current_setting:
await self.config.guild(guild).mention_here.set(False)
await ctx.send(
_("{here} will no longer be mentioned for stream alerts.").format(
here=inline("@\u200bhere")
)
)
else:
await self.config.guild(guild).mention_here.set(True)
await ctx.send(
_("When a stream is live, {here} will be mentioned.").format(
here=inline("@\u200bhere")
)
)
@mention.command()
@commands.guild_only()
async def role(self, ctx: commands.Context, *, role: discord.Role):
"""Toggle a role mention."""
current_setting = await self.config.role(role).mention()
if current_setting:
await self.config.role(role).mention.set(False)
await ctx.send(
_("{role} will no longer be mentioned for stream alerts.").format(
role=inline(f"@\u200b{role.name}")
)
)
else:
await self.config.role(role).mention.set(True)
msg = _("When a stream or community is live, {role} will be mentioned.").format(
role=inline(f"@\u200b{role.name}")
)
if not role.mentionable:
msg += " " + _(
"Since the role is not mentionable, it will be momentarily made mentionable "
"when announcing a streamalert. Please make sure I have the correct "
"permissions to manage this role, or else members of this role won't receive "
"a notification."
)
await ctx.send(msg)
@streamset.command()
@commands.guild_only()
async def autodelete(self, ctx: commands.Context, on_off: bool):
"""Toggle alert deletion for when streams go offline."""
await self.config.guild(ctx.guild).autodelete.set(on_off)
if on_off:
await ctx.send(_("The notifications will be deleted once streams go offline."))
else:
await ctx.send(_("Notifications will no longer be deleted."))
@streamset.command(name="ignorereruns")
@commands.guild_only()
async def ignore_reruns(self, ctx: commands.Context):
"""Toggle excluding rerun streams from alerts."""
guild = ctx.guild
current_setting = await self.config.guild(guild).ignore_reruns()
if current_setting:
await self.config.guild(guild).ignore_reruns.set(False)
await ctx.send(_("Streams of type 'rerun' will be included in alerts."))
else:
await self.config.guild(guild).ignore_reruns.set(True)
await ctx.send(_("Streams of type 'rerun' will no longer send an alert."))
@streamset.command(name="ignoreschedule")
@commands.guild_only()
async def ignore_schedule(self, ctx: commands.Context):
"""Toggle excluding YouTube streams schedules from alerts."""
guild = ctx.guild
current_setting = await self.config.guild(guild).ignore_schedule()
if current_setting:
await self.config.guild(guild).ignore_schedule.set(False)
await ctx.send(_("Streams schedules will be included in alerts."))
else:
await self.config.guild(guild).ignore_schedule.set(True)
await ctx.send(_("Streams schedules will no longer send an alert."))
@streamset.command(name="usebuttons")
@commands.guild_only()
async def use_buttons(self, ctx: commands.Context):
"""Toggle whether to use buttons for stream alerts."""
guild = ctx.guild
current_setting: bool = await self.config.guild(guild).use_buttons()
if current_setting:
await self.config.guild(guild).use_buttons.set(False)
await ctx.send(_("I will no longer use buttons in stream alerts."))
else:
await self.config.guild(guild).use_buttons.set(True)
await ctx.send(_("I will use buttons in stream alerts."))
async def add_or_remove(self, ctx: commands.Context, stream, discord_channel):
if discord_channel.id not in stream.channels:
stream.channels.append(discord_channel.id)
if stream not in self.streams:
self.streams.append(stream)
await ctx.send(
_(
"I'll now send a notification in the {channel.mention} channel"
" when {stream.name} is live."
).format(stream=stream, channel=discord_channel)
)
else:
stream.channels.remove(discord_channel.id)
if not stream.channels:
self.streams.remove(stream)
await ctx.send(
_(
"I won't send notifications about {stream.name}"
" in the {channel.mention} channel anymore"
).format(stream=stream, channel=discord_channel)
)
await self.save_streams()
def get_stream(self, _class, name):
for stream in self.streams:
# if isinstance(stream, _class) and stream.name == name:
# return stream
# Reloading this cog causes an issue with this check ^
# isinstance will always return False
# As a workaround, we'll compare the class' name instead.
# Good enough.
if _class.__name__ == "YoutubeStream" and stream.type == _class.__name__:
# Because name could be a username or a channel id
if self.check_name_or_id(name) and stream.name.lower() == name.lower():
return stream
elif not self.check_name_or_id(name) and stream.id == name:
return stream
elif stream.type == _class.__name__ and stream.name.lower() == name.lower():
return stream
@staticmethod
async def check_exists(stream):
try:
await stream.is_online()
except OfflineStream:
pass
except StreamNotFound:
return False
except StreamsError:
raise
return True
async def _stream_alerts(self):
await self.bot.wait_until_ready()
while True:
await self.check_streams()
await asyncio.sleep(await self.config.refresh_timer())
async def _send_stream_alert(
self,
stream,
channel: Union[discord.TextChannel, discord.VoiceChannel, discord.StageChannel],
embed: discord.Embed,
content: str = None,
*,
is_schedule: bool = False,
):
use_buttons: bool = await self.config.guild(channel.guild).use_buttons()
view = None
if use_buttons:
stream_url = embed.url
view = discord.ui.View()
view.add_item(
discord.ui.Button(
label=_("Watch the stream"), style=discord.ButtonStyle.link, url=stream_url
)
)
m = await channel.send(
content,
embed=embed,
allowed_mentions=discord.AllowedMentions(roles=True, everyone=True),
view=view,
)
message_data = {"guild": m.guild.id, "channel": m.channel.id, "message": m.id}
if is_schedule:
message_data["is_schedule"] = True
stream.messages.append(message_data)
async def check_streams(self):
to_remove = []
for stream in self.streams:
try:
try:
is_rerun = False
is_schedule = False
if stream.__class__.__name__ == "TwitchStream":
await self.maybe_renew_twitch_bearer_token()
embed, is_rerun = await stream.is_online()
elif stream.__class__.__name__ == "YoutubeStream":
embed, is_schedule = await stream.is_online()
else:
embed = await stream.is_online()
except StreamNotFound:
if stream.retry_count > MAX_RETRY_COUNT:
log.info("Stream with name %s no longer exists. Removing...", stream.name)
to_remove.append(stream)
else:
log.info(
"Stream with name %s seems to not exist, will retry later", stream.name
)
stream.retry_count += 1
continue
except OfflineStream:
if not stream.messages:
continue
for msg_data in stream.iter_messages():
partial_msg = msg_data["partial_message"]
if partial_msg is None:
continue
if await self.bot.cog_disabled_in_guild(self, partial_msg.guild):
continue
if not await self.config.guild(partial_msg.guild).autodelete():
continue
with contextlib.suppress(discord.NotFound):
await partial_msg.delete()
stream.messages.clear()
await self.save_streams()
except APIError as e:
log.error(
"Something went wrong whilst trying to contact the stream service's API.\n"
"Raw response data:\n%r",
e,
)
continue
else:
if stream.messages:
continue
for channel_id in stream.channels:
channel = self.bot.get_channel(channel_id)
if not channel:
continue
if await self.bot.cog_disabled_in_guild(self, channel.guild):
continue
guild_data = await self.config.guild(channel.guild).all()
if guild_data["ignore_reruns"] and is_rerun:
continue
if guild_data["ignore_schedule"] and is_schedule:
continue
if is_schedule:
# skip messages and mentions
await self._send_stream_alert(stream, channel, embed, is_schedule=True)
await self.save_streams()
continue
await set_contextual_locales_from_guild(self.bot, channel.guild)
mention_str, edited_roles = await self._get_mention_str(
channel.guild, channel, guild_data
)
if mention_str:
if guild_data["live_message_mention"]:
# Stop bad things from happening here...
content = guild_data["live_message_mention"]
content = content.replace(
"{stream.name}", str(stream.name)
) # Backwards compatibility
content = content.replace(
"{stream.display_name}", str(stream.display_name)
)
content = content.replace("{stream}", str(stream.name))
content = content.replace("{mention}", mention_str)
else:
content = _("{mention}, {display_name} is live!").format(
mention=mention_str,
display_name=escape(
str(stream.display_name),
mass_mentions=True,
formatting=True,
),
)
else:
if guild_data["live_message_nomention"]:
# Stop bad things from happening here...
content = guild_data["live_message_nomention"]
content = content.replace(
"{stream.name}", str(stream.name)
) # Backwards compatibility
content = content.replace(
"{stream.display_name}", str(stream.display_name)
)
content = content.replace("{stream}", str(stream.name))
else:
content = _("{display_name} is live!").format(
display_name=escape(
str(stream.display_name),
mass_mentions=True,
formatting=True,
)
)
await self._send_stream_alert(stream, channel, embed, content)
if edited_roles:
for role in edited_roles:
await role.edit(mentionable=False)
await self.save_streams()
except Exception as e:
log.error("An error has occurred with Streams. Please report it.", exc_info=e)
if to_remove:
for stream in to_remove:
self.streams.remove(stream)
await self.save_streams()
async def _get_mention_str(
self,
guild: discord.Guild,
channel: Union[discord.TextChannel, discord.VoiceChannel, discord.StageChannel],
guild_data: dict,
) -> Tuple[str, List[discord.Role]]:
"""Returns a 2-tuple with the string containing the mentions, and a list of
all roles which need to have their `mentionable` property set back to False.
"""
mentions = []
edited_roles = []
if guild_data["mention_everyone"]:
mentions.append("@everyone")
if guild_data["mention_here"]:
mentions.append("@here")
can_manage_roles = guild.me.guild_permissions.manage_roles
can_mention_everyone = channel.permissions_for(guild.me).mention_everyone
for role in guild.roles:
if await self.config.role(role).mention():
if not can_mention_everyone and can_manage_roles and not role.mentionable:
try:
await role.edit(mentionable=True)
except discord.Forbidden:
# Might still be unable to edit role based on hierarchy
pass
else:
edited_roles.append(role)
mentions.append(role.mention)
return " ".join(mentions), edited_roles
async def filter_streams(
self,
streams: list,
channel: Union[discord.TextChannel, discord.VoiceChannel, discord.StageChannel],
) -> list:
filtered = []
for stream in streams:
tw_id = str(stream["channel"]["_id"])
for alert in self.streams:
if isinstance(alert, TwitchStream) and alert.id == tw_id:
if channel.id in alert.channels:
break
else:
filtered.append(stream)
return filtered
async def load_streams(self):
streams = []
for raw_stream in await self.config.streams():
_class = getattr(_streamtypes, raw_stream["type"], None)
if not _class:
continue
token = await self.bot.get_shared_api_tokens(_class.token_name)
if token:
if _class.__name__ == "TwitchStream":
raw_stream["token"] = token.get("client_id")
raw_stream["bearer"] = self.ttv_bearer_cache.get("access_token", None)
else:
if _class.__name__ == "YoutubeStream":
raw_stream["config"] = self.config
raw_stream["token"] = token
raw_stream["_bot"] = self.bot
streams.append(_class(**raw_stream))
return streams
async def save_streams(self):
raw_streams = []
for stream in self.streams:
raw_streams.append(stream.export())
await self.config.streams.set(raw_streams)
def cog_unload(self):
if self.task:
self.task.cancel()