PyDrocsid/cogs

View on GitHub
moderation/logging/cog.py

Summary

Maintainability
A
0 mins
Test Coverage
import json
from datetime import timedelta
from io import StringIO
from typing import Optional, Union

from discord import Embed, File, Forbidden, Guild, Member, Message, RawMessageDeleteEvent, TextChannel
from discord.ext import commands, tasks
from discord.ext.commands import Command, CommandError, Context, Group, UserInputError, guild_only
from discord.utils import format_dt, snowflake_time, utcnow

from PyDrocsid.cog import Cog
from PyDrocsid.command import docs, reply
from PyDrocsid.database import db_wrapper
from PyDrocsid.embeds import send_long_embed
from PyDrocsid.environment import CACHE_TTL
from PyDrocsid.logger import get_logger
from PyDrocsid.redis import redis
from PyDrocsid.translations import t
from PyDrocsid.util import calculate_edit_distance, check_message_send_permissions

from .colors import Colors
from .models import LogExclude
from .permissions import LoggingPermission
from .settings import LoggingSettings
from ...contributor import Contributor
from ...pubsub import can_respond_on_reaction, ignore_message_delete, ignore_message_edit, send_alert, send_to_changelog


logger = get_logger(__name__)

tg = t.g
t = t.logging


def add_field(embed: Embed, name: str, text: str):
    first = True
    while text:
        embed.add_field(name=["\ufeff", name][first], value=text[:1024], inline=False)
        text = text[1024:]
        first = False


async def send_to_channel(guild: Guild, setting: LoggingSettings, message: Union[str, Embed]):
    msg = json.dumps(message.to_dict()) if isinstance(message, Embed) else message
    channel: Optional[TextChannel] = guild.get_channel(await setting.get())
    if not channel:
        logger.warning(f"Could not send message to {setting.name}: {msg}")
        return

    if isinstance(message, str):
        embed = Embed(colour=Colors.changelog, description=message)
    else:
        embed = message

    try:
        await channel.send(embed=embed)
    except Forbidden:
        logger.warning(f"Could not send message to {setting.name}: {msg}")
    else:
        logger.info(f"{setting.name}: {msg}")


async def is_logging_channel(channel: TextChannel) -> bool:
    for setting in [LoggingSettings.edit_channel, LoggingSettings.delete_channel]:
        if channel.id == await setting.get():
            return True

    return False


def _dump_embeds(embeds: list[Embed], file_name: str) -> File:
    return File(filename=file_name, fp=StringIO(json.dumps([embed.to_dict() for embed in embeds], indent=4)))


channels: list[str] = []


def add_channel(group: Group, name: str, *aliases: str) -> tuple[Group, Command, Command]:
    channels.append(name)

    @group.group(name=name, aliases=list(aliases))
    @LoggingPermission.write.check
    @docs(getattr(t.channels, name).manage_description)
    async def logging_channel(_, ctx: Context):
        if ctx.invoked_subcommand is None:
            raise UserInputError

    @logging_channel.command(name="channel", aliases=["ch", "c"])
    @docs(getattr(t.channels, name).set_description)
    async def set_channel(ctx: Context, *, channel: TextChannel):
        check_message_send_permissions(channel, check_embed=True)

        await getattr(LoggingSettings, f"{name}_channel").set(channel.id)
        embed = Embed(
            title=t.logging,
            description=(text := getattr(t.channels, name).updated(channel.mention)),
            color=Colors.Logging,
        )
        await reply(ctx, embed=embed)
        await send_to_changelog(ctx.guild, text)

    @logging_channel.command(name="disable", aliases=["d"])
    @docs(getattr(t.channels, name).disable_description)
    async def disable_channel(ctx: Context):
        await getattr(LoggingSettings, f"{name}_channel").reset()
        embed = Embed(title=t.logging, description=(text := getattr(t.channels, name).disabled), color=Colors.Logging)
        await reply(ctx, embed=embed)
        await send_to_changelog(ctx.guild, text)

    return logging_channel, set_channel, disable_channel


class LoggingCog(Cog, name="Logging"):
    CONTRIBUTORS = [Contributor.Defelo, Contributor.wolflu, Contributor.Tert0, Contributor.Infinity]

    async def get_logging_channel(self, setting: LoggingSettings) -> Optional[TextChannel]:
        return self.bot.get_channel(await setting.get())

    @send_to_changelog.subscribe
    async def handle_send_to_changelog(self, guild: Guild, message: Union[str, Embed]):
        await send_to_channel(guild, LoggingSettings.changelog_channel, message)

    @send_alert.subscribe
    async def handle_send_alert(self, guild: Guild, message: Union[str, Embed]):
        await send_to_channel(guild, LoggingSettings.alert_channel, message)

    @can_respond_on_reaction.subscribe
    async def handle_can_respond_on_reaction(self, channel: TextChannel) -> bool:
        for setting in [
            LoggingSettings.edit_channel,
            LoggingSettings.delete_channel,
            LoggingSettings.alert_channel,
            LoggingSettings.changelog_channel,
            LoggingSettings.member_join_channel,
            LoggingSettings.member_leave_channel,
        ]:
            if await setting.get() == channel.id:
                return False
        return True

    @ignore_message_edit.subscribe
    async def handle_ignore_message_edit(self, message: Message):
        await redis.setex(f"ignore_message_edit:{message.channel.id}:{message.id}", CACHE_TTL, 1)

    @ignore_message_delete.subscribe
    async def handle_ignore_message_delete(self, message: Message):
        await redis.setex(f"ignore_message_delete:{message.channel.id}:{message.id}", CACHE_TTL, 1)

    async def on_ready(self):
        try:
            self.cleanup_loop.start()
        except RuntimeError:
            self.cleanup_loop.restart()

    @tasks.loop(minutes=30)
    @db_wrapper
    async def cleanup_loop(self):
        days: int = await LoggingSettings.maxage.get()
        if days == -1:
            return

        timestamp = utcnow() - timedelta(days=days)
        for setting in [LoggingSettings.edit_channel, LoggingSettings.delete_channel]:
            channel: Optional[TextChannel] = await self.get_logging_channel(setting)
            if channel is None:
                continue

            async for message in channel.history(limit=None, oldest_first=True):  # type: Message
                if message.created_at > timestamp:
                    break

                await message.delete()

    async def on_message_edit(self, before: Message, after: Message):
        if before.guild is None:
            return
        if await redis.delete(f"ignore_message_edit:{before.channel.id}:{before.id}"):
            return
        mindiff: int = await LoggingSettings.edit_mindiff.get()
        old_message = await redis.get(key := f"little_diff_message_edit:{before.id}") or before.content
        if calculate_edit_distance(old_message, after.content) < mindiff and before.embeds == after.embeds:
            if not await redis.exists(key):
                await redis.setex(key, 60 * 60 * 24, before.content)
            return
        if (edit_channel := await self.get_logging_channel(LoggingSettings.edit_channel)) is None:
            return
        if await LogExclude.exists(after.channel.id):
            return
        await redis.delete(key)
        embed = Embed(title=t.message_edited, color=Colors.edit)
        embed.set_author(name=str(before.author), icon_url=before.author.display_avatar.url)
        embed.add_field(name=t.channel, value=before.channel.mention)
        embed.add_field(name=t.author, value=before.author.mention)
        embed.add_field(name=t.author_id, value=before.author.id)
        embed.add_field(name=t.message_id, value=before.id)
        embed.add_field(
            name=t.created_at,
            value=f"{format_dt(before.created_at, style='D')} {format_dt(before.created_at, style='T')}",
        )
        embed.add_field(name=t.url, value=before.jump_url, inline=False)
        add_field(embed, t.old_content, old_message)
        add_field(embed, t.new_content, after.content)
        files = []
        if before.embeds:
            files.append(_dump_embeds(before.embeds, t.before_edited_embeds))
        if after.embeds:
            files.append(_dump_embeds(after.embeds, t.after_edited_embeds))

        await edit_channel.send(embed=embed, files=files)

    async def on_raw_message_edit(self, channel: TextChannel, message: Message):
        if message.guild is None:
            return
        if await redis.delete(f"ignore_message_edit:{channel.id}:{message.id}"):
            return
        if (edit_channel := await self.get_logging_channel(LoggingSettings.edit_channel)) is None:
            return
        if await LogExclude.exists(message.channel.id):
            return

        embed = Embed(title=t.message_edited, color=Colors.edit)
        embed.add_field(name=t.channel, value=channel.mention)
        if message is not None:
            embed.set_author(name=str(message.author), icon_url=message.author.display_avatar.url)
            embed.add_field(name=t.author, value=message.author.mention)
            embed.add_field(name=t.author_id, value=message.author.id)
            embed.add_field(name=t.message_id, value=message.id)
            embed.add_field(
                name=t.created_at,
                value=f"{format_dt(message.created_at, style='D')} {format_dt(message.created_at, style='T')}",
            )
            embed.add_field(name=t.url, value=message.jump_url, inline=False)
            add_field(embed, t.new_content, message.content)
        file = None
        if message.embeds:
            file = _dump_embeds(message.embeds, t.after_edited_embeds)
        await edit_channel.send(embed=embed, file=file)

    async def on_message_delete(self, message: Message):
        if message.guild is None:
            return
        if await redis.delete(f"ignore_message_delete:{message.channel.id}:{message.id}"):
            return
        if (delete_channel := await self.get_logging_channel(LoggingSettings.delete_channel)) is None:
            return
        await redis.delete(f"little_diff_message_edit:{message.id}")
        if await is_logging_channel(message.channel):
            return
        if await LogExclude.exists(message.channel.id):
            return

        embed = Embed(title=t.message_deleted, color=Colors.delete)
        embed.set_author(name=str(message.author), icon_url=message.author.display_avatar.url)
        embed.add_field(name=t.channel, value=message.channel.mention)
        embed.add_field(name=t.author, value=message.author.mention)
        embed.add_field(name=t.author_id, value=message.author.id)
        embed.add_field(name=t.message_id, value=message.id)
        embed.add_field(
            name=t.created_at,
            value=f"{format_dt(message.created_at, style='D')} {format_dt(message.created_at, style='T')}",
        )
        add_field(embed, t.old_content, message.content)
        if message.attachments:
            out = []
            for attachment in message.attachments:
                size = attachment.size
                for _unit in "BKMG":
                    if size < 1000:
                        break
                    size /= 1000
                out.append(f"[{attachment.filename}]({attachment.url}) ({size:.1f} {_unit})")
            embed.add_field(name=t.attachments, value="\n".join(out), inline=False)
        files = None
        if message.embeds:
            files = _dump_embeds(message.embeds, t.after_deleted_embeds)
        await delete_channel.send(embed=embed, file=files)

    async def on_raw_message_delete(self, event: RawMessageDeleteEvent):
        if event.guild_id is None:
            return
        if await redis.delete(f"ignore_message_delete:{event.channel_id}:{event.message_id}"):
            return
        if (delete_channel := await self.get_logging_channel(LoggingSettings.delete_channel)) is None:
            return
        await redis.delete(f"little_diff_message_edit:{event.message_id}")
        if await LogExclude.exists(event.channel_id):
            return

        embed = Embed(title=t.message_deleted, color=Colors.delete)
        channel: Optional[TextChannel] = self.bot.get_channel(event.channel_id)
        if channel is not None:
            if await is_logging_channel(channel):
                return

            embed.add_field(name=t.channel, value=channel.mention)
            embed.add_field(name=t.message_id, value=event.message_id, inline=False)
            created_at = snowflake_time(event.message_id)
            embed.add_field(
                name=t.created_at, value=f"{format_dt(created_at, style='D')} {format_dt(created_at, style='T')}"
            )
        await delete_channel.send(embed=embed)

    async def on_member_join(self, member: Member):
        if (log_channel := await self.get_logging_channel(LoggingSettings.member_join_channel)) is None:
            return

        await log_channel.send(t.member_joined_server(member.mention, member))

    async def on_member_remove(self, member: Member):
        if (log_channel := await self.get_logging_channel(LoggingSettings.member_leave_channel)) is None:
            return

        await log_channel.send(t.member_left_server(member))

    @commands.group(aliases=["log"])
    @LoggingPermission.read.check
    @guild_only()
    @docs(t.commands.logging)
    async def logging(self, ctx: Context):
        if ctx.subcommand_passed is not None:
            if ctx.invoked_subcommand is None:
                raise UserInputError
            return

        embed = Embed(title=t.logging, color=Colors.Logging)

        maxage: int = await LoggingSettings.maxage.get()
        if maxage != -1:
            embed.add_field(name=t.maxage, value=tg.x_days(cnt=maxage), inline=False)
        else:
            embed.add_field(name=t.maxage, value=tg.disabled, inline=False)

        for name in channels:
            channel: Optional[TextChannel] = await self.get_logging_channel(getattr(LoggingSettings, f"{name}_channel"))
            embed.add_field(
                name=getattr(t.channels, name).name,
                value=channel.mention if channel else tg.disabled,
                inline=name == "edit",
            )

            if name == "edit" and channel is not None:
                mindist: int = await LoggingSettings.edit_mindiff.get()
                embed.add_field(name=t.channels.edit.mindist.name, value=str(mindist), inline=True)

        await reply(ctx, embed=embed)

    @logging.command(name="maxage", aliases=["ma"])
    @LoggingPermission.write.check
    @docs(t.commands.maxage)
    async def logging_maxage(self, ctx: Context, days: int):
        if days != -1 and not 0 < days < (1 << 31):
            raise CommandError(tg.invalid_duration)

        await LoggingSettings.maxage.set(days)
        embed = Embed(title=t.logging, color=Colors.Logging)
        if days == -1:
            embed.description = t.maxage_set_disabled
            await send_to_changelog(ctx.guild, t.maxage_set_disabled)
        else:
            embed.description = t.maxage_set(cnt=days)
            await send_to_changelog(ctx.guild, t.maxage_set(cnt=days))

        await reply(ctx, embed=embed)

    logging_edit, *_ = add_channel(logging, "edit", "e")
    logging_delete, *_ = add_channel(logging, "delete", "d")
    logging_alert, *_ = add_channel(logging, "alert", "al", "a")
    logging_changelog, *_ = add_channel(logging, "changelog", "change", "cl", "c")
    logging_member_join, *_ = add_channel(logging, "member_join", "memberjoin", "join", "mj")
    logging_member_leave, *_ = add_channel(logging, "member_leave", "memberleave", "leave", "ml")

    @logging_edit.command(name="mindist", aliases=["md"])
    @docs(t.channels.edit.mindist.set_description)
    async def logging_edit_mindist(self, ctx: Context, mindist: int):
        if mindist <= 0:
            raise CommandError(t.channels.edit.mindist.gt_zero)

        await LoggingSettings.edit_mindiff.set(mindist)
        embed = Embed(title=t.logging, description=t.channels.edit.mindist.updated(mindist), color=Colors.Logging)
        await reply(ctx, embed=embed)
        await send_to_changelog(ctx.guild, t.channels.edit.mindist.log_updated(mindist))

    @logging.group(name="exclude", aliases=["x", "ignore", "i"])
    @docs(t.commands.exclude)
    async def logging_exclude(self, ctx: Context):
        if len(ctx.message.content.lstrip(ctx.prefix).split()) > 2:
            if ctx.invoked_subcommand is None:
                raise UserInputError
            return

        embed = Embed(title=t.excluded_channels, colour=Colors.Logging)
        out = []
        for channel_id in await LogExclude.all():
            channel: Optional[TextChannel] = self.bot.get_channel(channel_id)
            if channel is None:
                await LogExclude.remove(channel_id)
            else:
                out.append(f":small_blue_diamond: {channel.mention}")
        if not out:
            embed.description = t.no_channels_excluded
            embed.colour = Colors.error
        else:
            embed.description = "\n".join(out)
        await send_long_embed(ctx, embed)

    @logging_exclude.command(name="add", aliases=["a", "+"])
    @LoggingPermission.write.check
    @docs(t.commands.exclude_add)
    async def logging_exclude_add(self, ctx: Context, channel: TextChannel):
        if await LogExclude.exists(channel.id):
            raise CommandError(t.already_excluded)

        await LogExclude.add(channel.id)
        embed = Embed(title=t.excluded_channels, description=t.excluded, colour=Colors.Logging)
        await reply(ctx, embed=embed)
        await send_to_changelog(ctx.guild, t.log_excluded(channel.mention))

    @logging_exclude.command(name="remove", aliases=["r", "del", "d", "-"])
    @LoggingPermission.write.check
    @docs(t.commands.exclude_remove)
    async def logging_exclude_remove(self, ctx: Context, channel: TextChannel):
        if not await LogExclude.exists(channel.id):
            raise CommandError(t.not_excluded)

        await LogExclude.remove(channel.id)
        embed = Embed(title=t.excluded_channels, description=t.unexcluded, colour=Colors.Logging)
        await reply(ctx, embed=embed)
        await send_to_changelog(ctx.guild, t.log_unexcluded(channel.mention))