moderation/mod/cog.py
import re
from datetime import datetime, timedelta
from typing import List, Optional, Tuple, Union
from discord import Embed, Forbidden, Guild, HTTPException, Member, Message, NotFound, Role, User
from discord.ext import commands, tasks
from discord.ext.commands import CommandError, Context, Converter, guild_only
from discord.utils import utcnow
from PyDrocsid.cog import Cog
from PyDrocsid.command import UserCommandError, reply
from PyDrocsid.converter import UserMemberConverter
from PyDrocsid.database import db, db_wrapper, filter_by
from PyDrocsid.settings import RoleSettings
from PyDrocsid.translations import t
from PyDrocsid.util import check_role_assignable, is_teamler
from .colors import Colors
from .models import Ban, Kick, Mute, Report, Warn
from .permissions import ModPermission
from ...contributor import Contributor
from ...pubsub import (
get_user_info_entries,
get_user_status_entries,
get_userlog_entries,
log_auto_kick,
revoke_verification,
send_alert,
send_to_changelog,
)
tg = t.g
t = t.mod
MAX_TIMEOUT = timedelta(days=28)
class DurationConverter(Converter):
async def convert(self, ctx, argument: str) -> Optional[int]:
if argument.lower() in ("inf", "perm", "permanent", "-1", "∞"):
return None
if (match := re.match(r"^(\d+)d?$", argument)) is None:
raise CommandError(tg.invalid_duration)
if (days := int(match.group(1))) <= 0:
raise CommandError(tg.invalid_duration)
if days >= (1 << 31):
raise CommandError(t.invalid_duration_inf)
return days
async def get_mute_role(guild: Guild) -> Role:
mute_role: Optional[Role] = guild.get_role(await RoleSettings.get("mute"))
if mute_role is None:
raise CommandError(t.mute_role_not_set)
return mute_role
async def send_to_changelog_mod(
guild: Guild,
message: Optional[Message],
colour: int,
title: str,
member: Union[Member, User, Tuple[int, str]],
reason: str,
*,
duration: Optional[str] = None,
):
embed = Embed(title=title, colour=colour, timestamp=utcnow())
if isinstance(member, tuple):
member_id, member_name = member
embed.set_author(name=member_name)
else:
member_id: int = member.id
member_name: str = str(member)
embed.set_author(name=member_name, icon_url=member.display_avatar.url)
embed.add_field(name=t.log_field.member, value=f"<@{member_id}>", inline=True)
embed.add_field(name=t.log_field.member_id, value=str(member_id), inline=True)
if message:
embed.set_footer(text=str(message.author), icon_url=message.author.display_avatar.url)
embed.add_field(
name=t.log_field.channel, value=t.jump_url(message.channel.mention, message.jump_url), inline=True
)
if duration:
embed.add_field(name=t.log_field.duration, value=duration, inline=True)
embed.add_field(name=t.log_field.reason, value=reason, inline=False)
await send_to_changelog(guild, embed)
class ModCog(Cog, name="Mod Tools"):
CONTRIBUTORS = [Contributor.Defelo, Contributor.wolflu, Contributor.Florian]
async def on_ready(self):
guild: Guild = self.bot.guilds[0]
mute_role: Optional[Role] = guild.get_role(await RoleSettings.get("mute"))
if mute_role is not None:
async for mute in await db.stream(filter_by(Mute, active=True)):
member: Optional[Member] = guild.get_member(mute.member)
if member is not None:
await member.add_roles(mute_role)
try:
self.mod_loop.start()
except RuntimeError:
self.mod_loop.restart()
@tasks.loop(minutes=30)
@db_wrapper
async def mod_loop(self):
guild: Guild = self.bot.guilds[0]
async for ban in await db.stream(filter_by(Ban, active=True)):
if ban.days != -1 and utcnow() >= ban.timestamp + timedelta(days=ban.days):
await Ban.deactivate(ban.id)
try:
user = await self.bot.fetch_user(ban.member)
except NotFound:
user = ban.member, ban.member_name
if isinstance(user, User):
try:
await guild.unban(user)
except Forbidden:
await send_alert(guild, t.cannot_unban_user_permissions(user.mention, user.id))
await send_to_changelog_mod(guild, None, Colors.unban, t.log_unbanned, user, t.log_unbanned_expired)
mute_role: Optional[Role] = guild.get_role(await RoleSettings.get("mute"))
if mute_role is None:
return
try:
check_role_assignable(mute_role)
except CommandError:
await send_alert(guild, t.cannot_assign_mute_role(mute_role, mute_role.id))
return
mute: Mute
async for mute in await db.stream(filter_by(Mute, active=True)):
member = guild.get_member(mute.member)
timeout: datetime | None = member.communication_disabled_until if member else None
if mute.days != -1 and utcnow() >= mute.timestamp + timedelta(days=mute.days):
if member:
await member.remove_roles(mute_role)
try:
await member.remove_timeout()
except Forbidden:
await send_alert(guild, t.cannot_remove_timeout(member.mention, member.id))
else:
member = mute.member, mute.member_name
await send_to_changelog_mod(guild, None, Colors.unmute, t.log_unmuted, member, t.log_unmuted_expired)
await Mute.deactivate(mute.id)
elif member and mute.days == -1:
try:
await member.timeout_for(MAX_TIMEOUT)
except Forbidden:
await send_alert(guild, t.cannot_update_timeout(member.mention, member.id))
elif member and (
not timeout or timeout + timedelta(seconds=2) < mute.timestamp + timedelta(days=mute.days)
):
delta = min(mute.timestamp + timedelta(days=mute.days) - utcnow(), MAX_TIMEOUT)
try:
await member.timeout_for(delta)
except Forbidden:
await send_alert(guild, t.cannot_update_timeout(member.mention, member.id))
@log_auto_kick.subscribe
async def handle_log_auto_kick(self, member: Member):
await Kick.create(member.id, str(member), None, None)
@get_user_info_entries.subscribe
async def handle_get_user_stats_entries(self, user_id: int) -> list[tuple[str, str]]:
out: list[tuple[str, str]] = []
async def count(cls):
if cls is Report:
active = await db.count(filter_by(cls, reporter=user_id))
else:
active = await db.count(filter_by(cls, mod=user_id))
passive = await db.count(filter_by(cls, member=user_id))
if cls is Kick:
if auto_kicks := await db.count(filter_by(cls, member=user_id, mod=None)):
return t.active_passive(active, passive - auto_kicks) + "\n" + t.autokicks(cnt=auto_kicks)
return t.active_passive(active, passive)
out.append((t.reported_cnt, await count(Report)))
out.append((t.warned_cnt, await count(Warn)))
out.append((t.muted_cnt, await count(Mute)))
out.append((t.kicked_cnt, await count(Kick)))
out.append((t.banned_cnt, await count(Ban)))
return out
@get_user_status_entries.subscribe
async def handle_get_user_status_entries(self, user_id: int) -> list[tuple[str, str]]:
status = t.none
if (ban := await db.get(Ban, member=user_id, active=True)) is not None:
if ban.days != -1:
expiry_date: datetime = ban.timestamp + timedelta(days=ban.days)
days_left = (expiry_date - utcnow()).days + 1
status = t.status_banned_days(cnt=ban.days, left=days_left)
else:
status = t.status_banned
elif (mute := await db.get(Mute, member=user_id, active=True)) is not None:
if mute.days != -1:
expiry_date: datetime = mute.timestamp + timedelta(days=mute.days)
days_left = (expiry_date - utcnow()).days + 1
status = t.status_muted_days(cnt=mute.days, left=days_left)
else:
status = t.status_muted
return [(t.active_sanctions, status)]
@get_userlog_entries.subscribe
async def handle_get_userlog_entries(self, user_id: int, author: Member) -> list[tuple[datetime, str]]:
out: list[tuple[datetime, str]] = []
if await is_teamler(author):
report: Report
async for report in await db.stream(filter_by(Report, member=user_id)):
out.append((report.timestamp, t.ulog.reported(f"<@{report.reporter}>", report.reason)))
warn: Warn
async for warn in await db.stream(filter_by(Warn, member=user_id)):
out.append((warn.timestamp, t.ulog.warned(f"<@{warn.mod}>", warn.reason)))
mute: Mute
async for mute in await db.stream(filter_by(Mute, member=user_id)):
text = t.ulog.muted.upgrade if mute.is_upgrade else t.ulog.muted.first
if mute.days == -1:
out.append((mute.timestamp, text.inf(f"<@{mute.mod}>", mute.reason)))
else:
out.append((mute.timestamp, text.temp(f"<@{mute.mod}>", mute.reason, cnt=mute.days)))
if not mute.active and not mute.upgraded:
if mute.unmute_mod is None:
out.append((mute.deactivation_timestamp, t.ulog.unmuted_expired))
else:
out.append(
(mute.deactivation_timestamp, t.ulog.unmuted(f"<@{mute.unmute_mod}>", mute.unmute_reason))
)
kick: Kick
async for kick in await db.stream(filter_by(Kick, member=user_id)):
if kick.mod is not None:
out.append((kick.timestamp, t.ulog.kicked(f"<@{kick.mod}>", kick.reason)))
else:
out.append((kick.timestamp, t.ulog.autokicked))
ban: Ban
async for ban in await db.stream(filter_by(Ban, member=user_id)):
text = t.ulog.banned.upgrade if ban.is_upgrade else t.ulog.banned.first
if ban.days == -1:
out.append((ban.timestamp, text.inf(f"<@{ban.mod}>", ban.reason)))
else:
out.append((ban.timestamp, text.temp(f"<@{ban.mod}>", ban.reason, cnt=ban.days)))
if not ban.active and not ban.upgraded:
if ban.unban_mod is None:
out.append((ban.deactivation_timestamp, t.ulog.unbanned_expired))
else:
out.append((ban.deactivation_timestamp, t.ulog.unbanned(f"<@{ban.unban_mod}>", ban.unban_reason)))
return out
async def on_member_join(self, member: Member):
mute: Mute | None = await db.get(Mute, active=True, member=member.id)
if not mute:
if member.timed_out:
try:
await member.remove_timeout()
except Forbidden:
await send_alert(member.guild, t.cannot_remove_timeout(member.mention, member.id))
return
if mute_role := member.guild.get_role(await RoleSettings.get("mute")):
await member.add_roles(mute_role)
if mute.days == -1:
delta = MAX_TIMEOUT
else:
delta = min(mute.timestamp + timedelta(days=mute.days) - utcnow(), MAX_TIMEOUT)
try:
await member.timeout_for(delta)
except Forbidden:
await send_alert(member.guild, t.cannot_update_timeout(member.mention, member.id))
@commands.command()
@guild_only()
async def report(self, ctx: Context, user: UserMemberConverter, *, reason: str):
"""
report a user
"""
user: Union[Member, User]
if len(reason) > 900:
raise CommandError(t.reason_too_long)
if user == self.bot.user:
raise UserCommandError(user, t.cannot_report)
if user == ctx.author:
raise UserCommandError(user, t.no_self_report)
await Report.create(user.id, str(user), ctx.author.id, reason)
server_embed = Embed(title=t.report, description=t.reported_response, colour=Colors.ModTools)
server_embed.set_author(name=str(user), icon_url=user.display_avatar.url)
await reply(ctx, embed=server_embed)
await send_to_changelog_mod(ctx.guild, ctx.message, Colors.report, t.log_reported, user, reason)
@commands.command()
@ModPermission.warn.check
@guild_only()
async def warn(self, ctx: Context, user: UserMemberConverter, *, reason: str):
"""
warn a user
"""
user: Union[Member, User]
if len(reason) > 900:
raise CommandError(t.reason_too_long)
if user == self.bot.user:
raise UserCommandError(user, t.cannot_warn)
user_embed = Embed(
title=t.warn, description=t.warned(ctx.author.mention, ctx.guild.name, reason), colour=Colors.ModTools
)
server_embed = Embed(title=t.warn, description=t.warned_response, colour=Colors.ModTools)
server_embed.set_author(name=str(user), icon_url=user.display_avatar.url)
try:
await user.send(embed=user_embed)
except (Forbidden, HTTPException):
server_embed.description = t.no_dm + "\n\n" + server_embed.description
server_embed.colour = Colors.error
await Warn.create(user.id, str(user), ctx.author.id, reason)
await reply(ctx, embed=server_embed)
await send_to_changelog_mod(ctx.guild, ctx.message, Colors.warn, t.log_warned, user, reason)
@commands.command()
@ModPermission.mute.check
@guild_only()
async def mute(self, ctx: Context, user: UserMemberConverter, days: DurationConverter, *, reason: str):
"""
mute a user
set days to `inf` for a permanent mute
"""
user: Union[Member, User]
days: Optional[int]
if len(reason) > 900:
raise CommandError(t.reason_too_long)
mute_role: Role = await get_mute_role(ctx.guild)
if user == self.bot.user or await is_teamler(user):
raise UserCommandError(user, t.cannot_mute)
if isinstance(user, Member):
check_role_assignable(mute_role)
try:
await user.timeout_for(min(timedelta(days=days), MAX_TIMEOUT) if days else MAX_TIMEOUT)
except Forbidden:
raise CommandError(t.cannot_mute)
await user.add_roles(mute_role)
active_mutes: List[Mute] = await db.all(filter_by(Mute, active=True, member=user.id))
for mute in active_mutes:
if mute.days == -1:
raise UserCommandError(user, t.already_muted)
ts = mute.timestamp + timedelta(days=mute.days)
if days is not None and utcnow() + timedelta(days=days) <= ts:
raise UserCommandError(user, t.already_muted)
for mute in active_mutes:
await Mute.upgrade(mute.id, ctx.author.id)
user_embed = Embed(title=t.mute, colour=Colors.ModTools)
server_embed = Embed(title=t.mute, description=t.muted_response, colour=Colors.ModTools)
server_embed.set_author(name=str(user), icon_url=user.display_avatar.url)
if days is not None:
await Mute.create(user.id, str(user), ctx.author.id, days, reason, bool(active_mutes))
user_embed.description = t.muted(ctx.author.mention, ctx.guild.name, reason, cnt=days)
await send_to_changelog_mod(
ctx.guild, ctx.message, Colors.mute, t.log_muted, user, reason, duration=t.log_field.days(cnt=days)
)
else:
await Mute.create(user.id, str(user), ctx.author.id, -1, reason, bool(active_mutes))
user_embed.description = t.muted_inf(ctx.author.mention, ctx.guild.name, reason)
await send_to_changelog_mod(
ctx.guild, ctx.message, Colors.mute, t.log_muted, user, reason, duration=t.log_field.days_infinity
)
try:
await user.send(embed=user_embed)
except (Forbidden, HTTPException):
server_embed.description = t.no_dm + "\n\n" + server_embed.description
server_embed.colour = Colors.error
await reply(ctx, embed=server_embed)
@commands.command()
@ModPermission.mute.check
@guild_only()
async def unmute(self, ctx: Context, user: UserMemberConverter, *, reason: str):
"""
unmute a user
"""
user: Union[Member, User]
if len(reason) > 900:
raise CommandError(t.reason_too_long)
mute_role: Role = await get_mute_role(ctx.guild)
was_muted = False
if isinstance(user, Member) and (mute_role in user.roles or user.timed_out):
was_muted = True
check_role_assignable(mute_role)
try:
await user.remove_timeout()
except Forbidden:
raise CommandError(t.cannot_unmute)
await user.remove_roles(mute_role)
async for mute in await db.stream(filter_by(Mute, active=True, member=user.id)):
await Mute.deactivate(mute.id, ctx.author.id, reason)
was_muted = True
if not was_muted:
raise UserCommandError(user, t.not_muted)
server_embed = Embed(title=t.unmute, description=t.unmuted_response, colour=Colors.ModTools)
server_embed.set_author(name=str(user), icon_url=user.display_avatar.url)
await reply(ctx, embed=server_embed)
await send_to_changelog_mod(ctx.guild, ctx.message, Colors.unmute, t.log_unmuted, user, reason)
@commands.command()
@ModPermission.kick.check
@guild_only()
async def kick(self, ctx: Context, member: Member, *, reason: str):
"""
kick a member
"""
if len(reason) > 900:
raise CommandError(t.reason_too_long)
if member == self.bot.user or await is_teamler(member):
raise UserCommandError(member, t.cannot_kick)
if not ctx.guild.me.guild_permissions.kick_members:
raise CommandError(t.cannot_kick_permissions)
if member.top_role >= ctx.guild.me.top_role or member.id == ctx.guild.owner_id:
raise UserCommandError(member, t.cannot_kick)
await Kick.create(member.id, str(member), ctx.author.id, reason)
await send_to_changelog_mod(ctx.guild, ctx.message, Colors.kick, t.log_kicked, member, reason)
user_embed = Embed(
title=t.kick, description=t.kicked(ctx.author.mention, ctx.guild.name, reason), colour=Colors.ModTools
)
server_embed = Embed(title=t.kick, description=t.kicked_response, colour=Colors.ModTools)
server_embed.set_author(name=str(member), icon_url=member.display_avatar.url)
try:
await member.send(embed=user_embed)
except (Forbidden, HTTPException):
server_embed.description = t.no_dm + "\n\n" + server_embed.description
server_embed.colour = Colors.error
await member.kick(reason=reason)
await revoke_verification(member)
await reply(ctx, embed=server_embed)
@commands.command()
@ModPermission.ban.check
@guild_only()
async def ban(
self, ctx: Context, user: UserMemberConverter, ban_days: DurationConverter, delete_days: int, *, reason: str
):
"""
ban a user
set ban_days to `inf` for a permanent ban
"""
ban_days: Optional[int]
user: Union[Member, User]
if not ctx.guild.me.guild_permissions.ban_members:
raise CommandError(t.cannot_ban_permissions)
if len(reason) > 900:
raise CommandError(t.reason_too_long)
if not 0 <= delete_days <= 7:
raise CommandError(tg.invalid_duration)
if user == self.bot.user or await is_teamler(user):
raise UserCommandError(user, t.cannot_ban)
if isinstance(user, Member) and (user.top_role >= ctx.guild.me.top_role or user.id == ctx.guild.owner_id):
raise UserCommandError(user, t.cannot_ban)
active_bans: List[Ban] = await db.all(filter_by(Ban, active=True, member=user.id))
for ban in active_bans:
if ban.days == -1:
raise UserCommandError(user, t.already_banned)
ts = ban.timestamp + timedelta(days=ban.days)
if ban_days is not None and utcnow() + timedelta(days=ban_days) <= ts:
raise UserCommandError(user, t.already_banned)
for ban in active_bans:
await Ban.upgrade(ban.id, ctx.author.id)
async for mute in await db.stream(filter_by(Mute, active=True, member=user.id)):
await Mute.upgrade(mute.id, ctx.author.id)
user_embed = Embed(title=t.ban, colour=Colors.ModTools)
server_embed = Embed(title=t.ban, description=t.banned_response, colour=Colors.ModTools)
server_embed.set_author(name=str(user), icon_url=user.display_avatar.url)
if ban_days is not None:
await Ban.create(user.id, str(user), ctx.author.id, ban_days, reason, bool(active_bans))
user_embed.description = t.banned(ctx.author.mention, ctx.guild.name, reason, cnt=ban_days)
await send_to_changelog_mod(
ctx.guild, ctx.message, Colors.ban, t.log_banned, user, reason, duration=t.log_field.days(cnt=ban_days)
)
else:
await Ban.create(user.id, str(user), ctx.author.id, -1, reason, bool(active_bans))
user_embed.description = t.banned_inf(ctx.author.mention, ctx.guild.name, reason)
await send_to_changelog_mod(
ctx.guild, ctx.message, Colors.ban, t.log_banned, user, reason, duration=t.log_field.days_infinity
)
try:
await user.send(embed=user_embed)
except (Forbidden, HTTPException):
server_embed.description = t.no_dm + "\n\n" + server_embed.description
server_embed.colour = Colors.error
await ctx.guild.ban(user, delete_message_days=delete_days, reason=reason)
await revoke_verification(user)
await reply(ctx, embed=server_embed)
@commands.command()
@ModPermission.ban.check
@guild_only()
async def unban(self, ctx: Context, user: UserMemberConverter, *, reason: str):
"""
unban a user
"""
user: Union[Member, User]
if len(reason) > 900:
raise CommandError(t.reason_too_long)
if not ctx.guild.me.guild_permissions.ban_members:
raise CommandError(t.cannot_unban_permissions)
was_banned = True
try:
await ctx.guild.unban(user, reason=reason)
except HTTPException:
was_banned = False
async for ban in await db.stream(filter_by(Ban, active=True, member=user.id)):
was_banned = True
await Ban.deactivate(ban.id, ctx.author.id, reason)
if not was_banned:
raise UserCommandError(user, t.not_banned)
server_embed = Embed(title=t.unban, description=t.unbanned_response, colour=Colors.ModTools)
server_embed.set_author(name=str(user), icon_url=user.display_avatar.url)
await reply(ctx, embed=server_embed)
await send_to_changelog_mod(ctx.guild, ctx.message, Colors.unban, t.log_unbanned, user, reason)