PyDrocsid/cogs

View on GitHub
general/reactionpin/cog.py

Summary

Maintainability
A
0 mins
Test Coverage
from typing import Optional

from discord import Embed, Guild, HTTPException, Member, Message, MessageType, PartialEmoji, TextChannel
from discord.ext import commands
from discord.ext.commands import CommandError, Context, UserInputError, guild_only

from PyDrocsid.cog import Cog
from PyDrocsid.command import make_error, reply
from PyDrocsid.database import db, select
from PyDrocsid.emojis import name_to_emoji
from PyDrocsid.events import StopEventHandling
from PyDrocsid.settings import RoleSettings
from PyDrocsid.translations import t

from .colors import Colors
from .models import ReactionPinChannel
from .permissions import ReactionPinPermission
from .settings import ReactionPinSettings
from ...contributor import Contributor
from ...pubsub import send_to_changelog


tg = t.g
t = t.reactionpin

EMOJI = name_to_emoji["pushpin"]


def check_channel(channel: TextChannel):
    if not channel.permissions_for(channel.guild.me).manage_messages:
        raise PermissionError(channel.guild, t.no_permission_alert(channel.mention))


class ReactionPinCog(Cog, name="ReactionPin"):
    CONTRIBUTORS = [Contributor.Defelo, Contributor.wolflu]

    async def on_raw_reaction_add(self, message: Message, emoji: PartialEmoji, member: Member):
        if str(emoji) != EMOJI or member.bot or message.guild is None:
            return

        access: bool = await ReactionPinPermission.pin.check_permissions(member)
        if not (await db.exists(select(ReactionPinChannel).filter_by(channel=message.channel.id)) or access):
            return

        blocked_role = await RoleSettings.get("mute")
        if access or (member == message.author and all(r.id != blocked_role for r in member.roles)):
            if message.type not in (MessageType.default, MessageType.reply):
                await message.remove_reaction(emoji, member)
                await message.channel.send(embed=make_error(t.msg_not_pinned_system))
                raise StopEventHandling

            check_channel(message.channel)

            try:
                await message.pin()
            except HTTPException:
                await message.remove_reaction(emoji, member)
                await message.channel.send(embed=make_error(t.msg_not_pinned_limit))
        else:
            await message.remove_reaction(emoji, member)

        raise StopEventHandling

    async def on_raw_reaction_remove(self, message: Message, emoji: PartialEmoji, member: Member):
        if str(emoji) != EMOJI or member.bot or message.guild is None:
            return

        access: bool = await ReactionPinPermission.pin.check_permissions(member)
        is_reactionpin_channel = await db.exists(select(ReactionPinChannel).filter_by(channel=message.channel.id))
        if message.pinned and (access or (is_reactionpin_channel and member == message.author)):
            check_channel(message.channel)
            await message.unpin()
            raise StopEventHandling

    async def on_raw_reaction_clear(self, message: Message):
        if message.guild is not None and message.pinned:
            check_channel(message.channel)
            await message.unpin()
        raise StopEventHandling

    async def on_self_message(self, message: Message):
        if message.guild is None:
            return

        pin_messages_enabled = await ReactionPinSettings.keep_pin_message.get()
        if not pin_messages_enabled and message.type == MessageType.pins_add:
            await message.delete()
            raise StopEventHandling

    @commands.group(aliases=["rp"])
    @ReactionPinPermission.read.check
    @guild_only()
    async def reactionpin(self, ctx: Context):
        """
        manage ReactionPin
        """

        if ctx.subcommand_passed is not None:
            if ctx.invoked_subcommand is None:
                raise UserInputError
            return

        embed = Embed(title=t.reactionpin, colour=Colors.ReactionPin)

        if await ReactionPinSettings.keep_pin_message.get():
            embed.add_field(name=t.pin_messages, value=tg.enabled, inline=False)
        else:
            embed.add_field(name=t.pin_messages, value=tg.disabled, inline=False)

        out = []
        guild: Guild = ctx.guild
        async for channel in await db.stream(select(ReactionPinChannel)):
            text_channel: Optional[TextChannel] = guild.get_channel(channel.channel)
            if text_channel is None:
                continue
            out.append(f":small_orange_diamond: {text_channel.mention}")
        if out:
            embed.add_field(name=t.whitelisted_channels, value="\n".join(out))
        else:
            embed.colour = Colors.error
            embed.add_field(name=t.whitelisted_channels, value=t.no_whitelisted_channels)

        await reply(ctx, embed=embed)

    @reactionpin.command(name="add", aliases=["a", "+"])
    @ReactionPinPermission.write.check
    async def reactionpin_add(self, ctx: Context, channel: TextChannel):
        """
        add channel to whitelist
        """

        if not channel.permissions_for(ctx.guild.me).manage_messages:
            raise CommandError(t.no_permission)

        if await db.exists(select(ReactionPinChannel).filter_by(channel=channel.id)):
            raise CommandError(t.channel_already_whitelisted)

        await ReactionPinChannel.create(channel.id)
        embed = Embed(title=t.reactionpin, colour=Colors.ReactionPin, description=t.channel_whitelisted)
        await reply(ctx, embed=embed)
        await send_to_changelog(ctx.guild, t.log_channel_whitelisted_rp(channel.mention))

    @reactionpin.command(name="remove", aliases=["del", "r", "d", "-"])
    @ReactionPinPermission.write.check
    async def reactionpin_remove(self, ctx: Context, channel: TextChannel):
        """
        remove channel from whitelist
        """

        if not (row := await db.first(select(ReactionPinChannel).filter_by(channel=channel.id))):
            raise CommandError(t.channel_not_whitelisted)

        await db.delete(row)
        embed = Embed(title=t.reactionpin, colour=Colors.ReactionPin, description=t.channel_removed)
        await reply(ctx, embed=embed)
        await send_to_changelog(ctx.guild, t.log_channel_removed_rp(channel.mention))

    @reactionpin.command(name="pin_message", aliases=["pm"])
    @ReactionPinPermission.write.check
    async def reactionpin_pin_message(self, ctx: Context, enabled: bool):
        """
        enable/disable "pinned a message" notification
        """

        embed = Embed(title=t.reactionpin, colour=Colors.ReactionPin)
        await ReactionPinSettings.keep_pin_message.set(enabled)
        if enabled:
            embed.description = t.pin_messages_now_enabled
            await send_to_changelog(ctx.guild, t.log_pin_messages_now_enabled)
        else:
            embed.description = t.pin_messages_now_disabled
            await send_to_changelog(ctx.guild, t.log_pin_messages_now_disabled)
        await reply(ctx, embed=embed)