PyDrocsid/cogs

View on GitHub
moderation/message/cog.py

Summary

Maintainability
A
0 mins
Test Coverage
from typing import Optional

from discord import Embed, File, Forbidden, HTTPException, Member, Message, NotFound, Permissions, TextChannel
from discord.ext import commands
from discord.ext.commands import CommandError, Context, UserInputError, guild_only

from PyDrocsid.cog import Cog
from PyDrocsid.command import Confirmation, add_reactions, docs, reply
from PyDrocsid.converter import Color
from PyDrocsid.discohook import (
    DISCOHOOK_EMPTY_MESSAGE,
    DiscoHookError,
    MessageContent,
    create_discohook_link,
    load_discohook_link,
)
from PyDrocsid.translations import t
from PyDrocsid.util import check_message_send_permissions, read_complete_message, read_normal_message

from .colors import Colors
from .permissions import MessagePermission
from ...contributor import Contributor
from ...pubsub import send_alert


tg = t.g
t = t.message


class MessageCog(Cog, name="Message Commands"):
    CONTRIBUTORS = [Contributor.Defelo, Contributor.wolflu, Contributor.LoC]

    async def get_message_cancel(self, channel: TextChannel, member: Member) -> tuple[Optional[str], list[File]]:
        content, files = await read_normal_message(self.bot, channel, member)
        if content == t.cancel:
            embed = Embed(title=t.messages, colour=Colors.MessageCommands, description=t.msg_send_cancel)
            await channel.send(embed=embed)
            return None, []

        return content, files

    @commands.group()
    @MessagePermission.send.check
    @guild_only()
    @docs(t.commands.send)
    async def send(self, ctx: Context):
        if ctx.invoked_subcommand is None:
            raise UserInputError

    @send.command(name="text", aliases=["t"])
    @docs(t.commands.send_text)
    async def send_text(self, ctx: Context, channel: TextChannel):
        check_message_send_permissions(channel)

        embed = Embed(title=t.messages, colour=Colors.MessageCommands, description=t.send_message(t.cancel))
        await reply(ctx, embed=embed)
        content, files = await self.get_message_cancel(ctx.channel, ctx.author)

        if content is None:
            return

        try:
            await channel.send(content=content, files=files)
        except (HTTPException, Forbidden):
            raise CommandError(t.msg_could_not_be_sent)
        else:
            embed.description = t.msg_sent
            await reply(ctx, embed=embed)

    @send.command(name="embed", aliases=["e"])
    @docs(t.commands.send_embed)
    async def send_embed(self, ctx: Context, channel: TextChannel, color: Optional[Color] = None):
        check_message_send_permissions(channel, check_embed=True)

        embed = Embed(title=t.messages, colour=Colors.MessageCommands, description=t.send_embed_title(t.cancel))
        await reply(ctx, embed=embed)
        title, _ = await self.get_message_cancel(ctx.channel, ctx.author)
        if title is None:
            return
        if len(title) > 256:
            raise CommandError(t.title_too_long)

        embed.description = t.send_embed_content(t.cancel)
        await reply(ctx, embed=embed)
        content, files = await self.get_message_cancel(ctx.channel, ctx.author)

        if content is None:
            return

        send_embed = Embed(title=title, description=content)

        if files and any(files[0].filename.lower().endswith(ext) for ext in ["jpg", "jpeg", "png", "gif"]):
            send_embed.set_image(url="attachment://" + files[0].filename)

        if color is not None:
            send_embed.colour = color

        try:
            await channel.send(embed=send_embed, files=files)
        except (HTTPException, Forbidden):
            raise CommandError(t.msg_could_not_be_sent)
        else:
            embed.description = t.msg_sent
            await reply(ctx, embed=embed)

    @send.command(name="copy", aliases=["c"])
    @docs(t.commands.send_copy)
    async def send_copy(self, ctx: Context, channel: TextChannel, message: Message):
        content, files, embed = await read_complete_message(message)
        try:
            await channel.send(content=content, embed=embed, files=files)
        except (HTTPException, Forbidden):
            raise CommandError(t.msg_could_not_be_sent)
        else:
            embed = Embed(title=t.messages, colour=Colors.MessageCommands, description=t.msg_sent)
            await reply(ctx, embed=embed)

    @send.command(name="discohook", aliases=["dh"])
    @docs(t.commands.send_discohook(DISCOHOOK_EMPTY_MESSAGE))
    async def send_discohook(self, ctx: Context, channel: TextChannel, *, discohook_url: str):
        try:
            messages: list[MessageContent] = [
                msg for msg in await load_discohook_link(discohook_url) if not msg.is_empty
            ]
        except DiscoHookError:
            raise CommandError(t.discohook_invalid)

        if not messages:
            raise CommandError(t.discohook_empty)

        check_message_send_permissions(channel, check_embed=any(m.embeds for m in messages))

        try:
            for message in messages:
                content: str | None = message.content
                for embed in message.embeds or [None]:
                    await channel.send(content=content, embed=embed)
                    content = None
        except (HTTPException, Forbidden):
            raise CommandError(t.msg_could_not_be_sent)

        await add_reactions(ctx.message, "white_check_mark")

    @commands.group()
    @MessagePermission.edit.check
    @guild_only()
    @docs(t.commands.edit)
    async def edit(self, ctx: Context):
        if ctx.invoked_subcommand is None:
            raise UserInputError

    @edit.command(name="text", aliases=["t"])
    @docs(t.commands.edit_text)
    async def edit_text(self, ctx: Context, message: Message):
        if message.author != self.bot.user:
            raise CommandError(t.could_not_edit)
        check_message_send_permissions(message.channel, check_send=False)

        embed = Embed(title=t.messages, colour=Colors.MessageCommands, description=t.send_new_message(t.cancel))
        await reply(ctx, embed=embed)
        content, files = await self.get_message_cancel(ctx.channel, ctx.author)

        if content is None:
            return

        if files:
            raise CommandError(t.cannot_edit_files)

        await message.edit(content=content, embed=None)
        embed.description = t.msg_edited
        await reply(ctx, embed=embed)

    @edit.command(name="embed", aliases=["e"])
    @docs(t.commands.edit_embed)
    async def edit_embed(self, ctx: Context, message: Message, color: Optional[Color] = None):
        if message.author != self.bot.user:
            raise CommandError(t.could_not_edit)
        check_message_send_permissions(message.channel, check_send=False, check_embed=True)

        embed = Embed(title=t.messages, colour=Colors.MessageCommands, description=t.send_embed_title(t.cancel))
        await reply(ctx, embed=embed)
        title, _ = await self.get_message_cancel(ctx.channel, ctx.author)

        if title is None:
            return
        if len(title) > 256:
            raise CommandError(t.title_too_long)

        embed.description = t.send_embed_content(t.cancel)
        await reply(ctx, embed=embed)
        content, _ = await self.get_message_cancel(ctx.channel, ctx.author)

        if content is None:
            return

        send_embed = Embed(title=title, description=content)

        if color is not None:
            send_embed.colour = color
        elif message.embeds and message.embeds[0].color is not Embed.Empty:
            send_embed.colour = message.embeds[0].color

        await message.edit(content=None, files=[], embed=send_embed)
        embed.description = t.msg_edited
        await reply(ctx, embed=embed)

    @edit.command(name="copy", aliases=["c"])
    @docs(t.commands.edit_copy)
    async def edit_copy(self, ctx: Context, message: Message, source: Message):
        if message.author != self.bot.user:
            raise CommandError(t.could_not_edit)

        content, files, embed = await read_complete_message(source)
        if files:
            raise CommandError(t.cannot_edit_files)
        await message.edit(content=content, embed=embed)
        embed = Embed(title=t.messages, colour=Colors.MessageCommands, description=t.msg_edited)
        await reply(ctx, embed=embed)

    @edit.command(name="discohook", aliases=["dh"])
    @docs(t.commands.edit_discohook(DISCOHOOK_EMPTY_MESSAGE))
    async def edit_discohook(self, ctx: Context, message: Message, discohook_url: str):
        if message.author != self.bot.user:
            raise CommandError(t.could_not_edit)

        try:
            messages: list[MessageContent] = [
                msg for msg in await load_discohook_link(discohook_url) if not msg.is_empty
            ]
        except DiscoHookError:
            raise CommandError(t.discohook_invalid)

        if not messages:
            raise CommandError(t.discohook_empty)
        if len(messages) > 1:
            raise CommandError(t.discohook_multiple_messages)

        content, embeds = messages[0]
        if len(embeds) > 1:
            raise CommandError(t.discohook_multiple_embeds)

        try:
            await message.edit(content=content, embeds=embeds)
        except (HTTPException, Forbidden):
            raise CommandError(t.msg_could_not_be_sent)

        await add_reactions(ctx.message, "white_check_mark")

    @commands.command()
    @MessagePermission.delete.check
    @guild_only()
    @docs(t.commands.delete)
    async def delete(self, ctx: Context, message: Message):
        if message.guild is None:
            raise CommandError(t.cannot_delete_dm)

        channel: TextChannel = message.channel
        permissions: Permissions = channel.permissions_for(message.guild.me)
        if message.author != self.bot.user and not permissions.manage_messages:
            raise CommandError(t.could_not_delete)

        await message.delete()
        embed = Embed(title=t.messages, colour=Colors.MessageCommands, description=t.msg_deleted)
        await reply(ctx, embed=embed)

    @commands.command(aliases=["clean"])
    @MessagePermission.clear.check
    @guild_only()
    @docs(t.commands.clear)
    async def clear(self, ctx: Context, count: int):
        channel: TextChannel = ctx.channel

        if count not in range(1, 101):
            raise CommandError(t.count_between)

        if not await Confirmation().run(ctx, t.confirm(channel.mention, cnt=count)):
            return

        messages = (await channel.history(limit=count + 2).flatten())[2:]
        try:
            await channel.delete_messages(messages)
        except (Forbidden, NotFound, HTTPException):
            raise CommandError(t.msg_not_deleted)

        await reply(
            ctx,
            embed=Embed(
                title=t.clear_channel,
                description=t.deleted_messages(channel.mention, cnt=count),
                color=Colors.MessageCommands,
            ),
        )
        await send_alert(ctx.guild, t.log_cleared(ctx.author.mention, channel.mention, cnt=count))

    @commands.command(aliases=["dh"])
    @docs(t.commands.discohook)
    async def discohook(self, ctx: Context, *messages: Message):
        if not messages:
            raise UserInputError
        for msg in messages:
            if not msg.channel.permissions_for(ctx.author).read_message_history:
                raise CommandError(t.cannot_read_messages(msg.channel.mention))

        try:
            url = await create_discohook_link(*messages)
        except DiscoHookError:
            raise CommandError(t.discohook_create_failed)

        await reply(ctx, url)