moderation/mediaonly/cog.py
import re
from datetime import datetime
from typing import Optional
from aiohttp import ClientError, ClientSession
from discord import Embed, Forbidden, Guild, Message, TextChannel
from discord.ext import commands
from discord.ext.commands import CommandError, Context, UserInputError, guild_only
from PyDrocsid.cog import Cog
from PyDrocsid.command import docs, reply
from PyDrocsid.database import db, filter_by
from PyDrocsid.embeds import send_long_embed
from PyDrocsid.events import StopEventHandling
from PyDrocsid.translations import t
from .colors import Colors
from .models import MediaOnlyChannel, MediaOnlyDeletion
from .permissions import MediaOnlyPermission
from ...contributor import Contributor
from ...pubsub import can_respond_on_reaction, get_userlog_entries, send_alert, send_to_changelog
tg = t.g
t = t.mediaonly
async def contains_image(message: Message) -> bool:
urls = [(att.url,) for att in message.attachments]
urls += re.findall(r"(https?://([a-zA-Z0-9\-_~]+\.)+[a-zA-Z0-9\-_~]+(/\S*)?)", message.content)
for url, *_ in urls:
try:
async with ClientSession() as session, session.head(url, allow_redirects=True) as response:
content_length = int(response.headers["Content-length"])
mime = response.headers["Content-type"]
except (KeyError, AttributeError, UnicodeError, ConnectionError, ClientError):
break
if mime.startswith("image/") and content_length >= 256:
return True
return False
async def delete_message(message: Message):
try:
await message.delete()
except Forbidden:
deleted = False
else:
deleted = True
await MediaOnlyDeletion.create(message.author.id, str(message.author), message.channel.id)
embed = Embed(title=t.mediaonly, description=t.deleted_nomedia, colour=Colors.error)
await message.channel.send(content=message.author.mention, embed=embed, delete_after=30)
if deleted:
await send_alert(message.guild, t.log_deleted_nomedia(message.author.mention, message.channel.mention))
else:
await send_alert(message.guild, t.log_nomedia_not_deleted(message.author.mention, message.channel.mention))
async def check_message(message: Message):
if message.guild is None or message.author.bot:
return
if await MediaOnlyPermission.bypass.check_permissions(message.author):
return
if not await MediaOnlyChannel.exists(message.channel.id):
return
if await contains_image(message):
return
await delete_message(message)
raise StopEventHandling
class MediaOnlyCog(Cog, name="MediaOnly"):
CONTRIBUTORS = [Contributor.Defelo, Contributor.wolflu]
@can_respond_on_reaction.subscribe
async def handle_can_respond_on_reaction(self, channel: TextChannel) -> bool:
return not await db.exists(filter_by(MediaOnlyChannel, channel=channel.id))
@get_userlog_entries.subscribe
async def handle_get_userlog_entries(self, user_id: int, _) -> list[tuple[datetime, str]]:
out: list[tuple[datetime, str]] = []
deletion: MediaOnlyDeletion
async for deletion in await db.stream(filter_by(MediaOnlyDeletion, member=user_id)):
out.append((deletion.timestamp, t.ulog_deletion(f"<#{deletion.channel}>")))
return out
async def on_message(self, message: Message):
await check_message(message)
async def on_message_edit(self, _, after: Message):
await check_message(after)
@commands.group(aliases=["mo"])
@MediaOnlyPermission.read.check
@guild_only()
@docs(t.commands.mediaonly)
async def mediaonly(self, ctx: Context):
if ctx.subcommand_passed is not None:
if ctx.invoked_subcommand is None:
raise UserInputError
return
guild: Guild = ctx.guild
out = []
async for channel in MediaOnlyChannel.stream():
text_channel: Optional[TextChannel] = guild.get_channel(channel)
if not text_channel:
await MediaOnlyChannel.remove(channel)
continue
out.append(f":small_orange_diamond: {text_channel.mention}")
embed = Embed(title=t.media_only_channels_header, colour=Colors.error)
if out:
embed.colour = Colors.MediaOnly
embed.description = "\n".join(out)
await send_long_embed(ctx, embed)
else:
embed.description = t.no_media_only_channels
await reply(ctx, embed=embed)
@mediaonly.command(name="add", aliases=["a", "+"])
@MediaOnlyPermission.write.check
@docs(t.commands.add)
async def mediaonly_add(self, ctx: Context, channel: TextChannel):
if await MediaOnlyChannel.exists(channel.id):
raise CommandError(t.channel_already_media_only)
if not channel.permissions_for(channel.guild.me).manage_messages:
raise CommandError(t.media_only_not_changed_no_permissions)
await MediaOnlyChannel.add(channel.id)
embed = Embed(title=t.media_only_channels_header, description=t.channel_now_media_only, colour=Colors.MediaOnly)
await reply(ctx, embed=embed)
await send_to_changelog(ctx.guild, t.log_channel_now_media_only(channel.mention))
@mediaonly.command(name="remove", aliases=["del", "r", "d", "-"])
@MediaOnlyPermission.write.check
@docs(t.commands.remove)
async def mediaonly_remove(self, ctx: Context, channel: TextChannel):
if not await MediaOnlyChannel.exists(channel.id):
raise CommandError(t.channel_not_media_only)
await MediaOnlyChannel.remove(channel.id)
embed = Embed(
title=t.media_only_channels_header, description=t.channel_not_media_only_anymore, colour=Colors.MediaOnly
)
await reply(ctx, embed=embed)
await send_to_changelog(ctx.guild, t.log_channel_not_media_only_anymore(channel.mention))