general/reactionrole/cog.py
from typing import Dict, Optional, Set, Tuple
from discord import Embed, Forbidden, HTTPException, Member, Message, NotFound, PartialEmoji, Role, TextChannel
from discord.ext import commands
from discord.ext.commands import CommandError, Context, UserInputError, guild_only
from PyDrocsid.cog import Cog
from PyDrocsid.command import add_reactions, docs, reply
from PyDrocsid.converter import EmojiConverter
from PyDrocsid.database import db, filter_by, select
from PyDrocsid.embeds import send_long_embed
from PyDrocsid.events import StopEventHandling
from PyDrocsid.logger import get_logger
from PyDrocsid.translations import t
from PyDrocsid.util import check_role_assignable
from .colors import Colors
from .models import ReactionRole
from .permissions import ReactionRolePermission
from ...contributor import Contributor
from ...pubsub import send_to_changelog
tg = t.g
t = t.reactionrole
logger = get_logger(__name__)
async def get_role(message: Message, emoji: PartialEmoji) -> tuple[Optional[Role], Optional[ReactionRole]]:
link: Optional[ReactionRole] = await ReactionRole.get(message.channel.id, message.id, str(emoji))
if link is None:
return None, None
role: Optional[Role] = message.guild.get_role(link.role_id)
if role is None:
await db.delete(link)
return None, None
return role, link
class ReactionRoleCog(Cog, name="ReactionRole"):
CONTRIBUTORS = [Contributor.Defelo, Contributor.wolflu]
async def on_raw_reaction_add(self, message: Message, emoji: PartialEmoji, member: Member):
if member.bot or message.guild is None:
return
role, link = await get_role(message, emoji)
if not role or not link:
return
try:
if link.reverse:
await member.remove_roles(role)
else:
await member.add_roles(role)
except (Forbidden, HTTPException):
raise PermissionError(
message.guild, t.manage_role_error(role=role, member=member, message=message, emoji=emoji)
)
if link.auto_remove:
try:
await message.remove_reaction(emoji, member)
except (HTTPException, Forbidden, NotFound):
raise PermissionError(
message.guild, t.remove_emoji_error(role=role, member=member, message=message, emoji=emoji)
)
raise StopEventHandling
async def on_raw_reaction_remove(self, message: Message, emoji: PartialEmoji, member: Member):
if member.bot or message.guild is None:
return
role, link = await get_role(message, emoji)
if not role or not link or link.auto_remove:
return
try:
if link.reverse:
await member.add_roles(role)
else:
await member.remove_roles(role)
except (Forbidden, HTTPException):
raise PermissionError(
message.guild, t.manage_role_error(role=role, member=member, message=message, emoji=emoji)
)
raise StopEventHandling
@commands.group(aliases=["rr"])
@ReactionRolePermission.read.check
@guild_only()
@docs(t.commands.reactionrole)
async def reactionrole(self, ctx: Context):
if ctx.subcommand_passed is not None:
if ctx.invoked_subcommand is None:
raise UserInputError
return
embed = Embed(title=t.reactionrole, colour=Colors.ReactionRole)
channels: Dict[TextChannel, Dict[Message, Set[str]]] = {}
message_cache: Dict[Tuple[int, int], Message] = {}
async for link in await db.stream(select(ReactionRole)): # type: ReactionRole
channel: Optional[TextChannel] = ctx.guild.get_channel(link.channel_id)
if channel is None:
await db.delete(link)
continue
key = link.channel_id, link.message_id
if key not in message_cache:
try:
message_cache[key] = await channel.fetch_message(link.message_id)
except HTTPException:
await db.delete(link)
continue
msg = message_cache[key]
if ctx.guild.get_role(link.role_id) is None:
await db.delete(link)
continue
channels.setdefault(channel, {}).setdefault(msg, set())
channels[channel][msg].add(link.emoji)
if not channels:
embed.colour = Colors.error
embed.description = t.no_reactionrole_links
else:
out = []
for channel, messages in channels.items():
value = channel.mention + "\n"
for msg, emojis in messages.items():
value += f"[{msg.id}]({msg.jump_url}): {' '.join(emojis)}\n"
out.append(value)
embed.description = "\n".join(out)
await send_long_embed(ctx, embed)
@reactionrole.command(name="list", aliases=["l", "?"])
@docs(t.commands.reactionrole_list)
async def reactionrole_list(self, ctx: Context, msg: Message):
embed = Embed(title=t.reactionrole, colour=Colors.ReactionRole)
out = []
link: ReactionRole
async for link in await db.stream(select(ReactionRole).filter_by(channel_id=msg.channel.id, message_id=msg.id)):
channel: Optional[TextChannel] = ctx.guild.get_channel(link.channel_id)
if channel is None:
await db.delete(link)
continue
try:
await channel.fetch_message(link.message_id)
except HTTPException:
await db.delete(link)
continue
role: Optional[Role] = ctx.guild.get_role(link.role_id)
if role is None:
await db.delete(link)
continue
flags = [t.reverse] * link.reverse + [t.auto_remove] * link.auto_remove
out.append(t.rr_link(link.emoji, role.mention) + f" ({', '.join(flags)})" * bool(flags))
if not out:
embed.colour = Colors.error
embed.description = t.no_reactionrole_links_for_msg
else:
embed.description = "\n".join(out)
await send_long_embed(ctx, embed)
@reactionrole.command(name="add", aliases=["a", "+"])
@ReactionRolePermission.write.check
@docs(t.commands.reactionrole_add)
async def reactionrole_add(
self, ctx: Context, msg: Message, emoji: EmojiConverter, role: Role, reverse: bool, auto_remove: bool
):
emoji: PartialEmoji
if await ReactionRole.get(msg.channel.id, msg.id, str(emoji)):
raise CommandError(t.rr_link_already_exists)
if not msg.channel.permissions_for(msg.guild.me).add_reactions:
raise CommandError(t.rr_link_not_created_no_permissions)
check_role_assignable(role)
try:
await msg.add_reaction(emoji)
except Forbidden:
raise CommandError(t.could_not_add_reactions)
await ReactionRole.create(msg.channel.id, msg.id, str(emoji), role.id, reverse, auto_remove)
embed = Embed(title=t.reactionrole, colour=Colors.ReactionRole, description=t.rr_link_created)
await reply(ctx, embed=embed)
await send_to_changelog(ctx.guild, t.log_rr_link_created(emoji, role.id, msg.jump_url, msg.channel.mention))
@reactionrole.command(name="remove", aliases=["r", "del", "d", "-"])
@ReactionRolePermission.write.check
@docs(t.commands.reactionrole_remove)
async def reactionrole_remove(
self, ctx: Context, msg: Message, emoji: EmojiConverter, remove_reactions: bool = True
):
emoji: PartialEmoji
if not (link := await ReactionRole.get(msg.channel.id, msg.id, str(emoji))):
raise CommandError(t.rr_link_not_found)
await db.delete(link)
embed = Embed(title=t.reactionrole, colour=Colors.ReactionRole, description=t.rr_link_removed)
if remove_reactions:
for reaction in msg.reactions:
if str(emoji) == str(reaction.emoji):
try:
await reaction.clear()
except Forbidden:
embed.description += "\n\n:warning: " + t.could_not_remove_reactions
break
await reply(ctx, embed=embed)
await send_to_changelog(
ctx.guild, t.log_rr_link_removed(emoji, link.role_id, msg.jump_url, msg.channel.mention)
)
@reactionrole.command(name="reinitialize", aliases=["reinit"])
@ReactionRolePermission.write.check
@docs(t.commands.reactionrole_reinialize)
async def reactionrole_reinialize(self, ctx: Context, msg: Message, emoji: Optional[EmojiConverter]):
if emoji:
emoji: PartialEmoji
if not await ReactionRole.get(msg.channel.id, msg.id, str(emoji)):
raise CommandError(t.rr_link_not_found)
for reaction in msg.reactions:
if str(reaction) == str(emoji):
try:
await reaction.clear()
except Forbidden:
raise CommandError(t.could_not_remove_reactions)
break
try:
await msg.add_reaction(emoji)
except Forbidden:
raise CommandError(t.could_not_add_reactions)
await add_reactions(ctx, "white_check_mark")
return
links: list[ReactionRole] = await db.all(filter_by(ReactionRole, channel_id=msg.channel.id, message_id=msg.id))
if not links:
raise CommandError(t.rr_link_not_found)
try:
await msg.clear_reactions()
except Forbidden:
raise CommandError(t.could_not_remove_reactions)
for link in links:
try:
await msg.add_reaction(link.emoji)
except Forbidden:
raise CommandError(t.could_not_add_reactions)
await add_reactions(ctx, "white_check_mark")