PyDrocsid/cogs

View on GitHub
information/inactivity/cog.py

Summary

Maintainability
A
0 mins
Test Coverage
import asyncio
from datetime import datetime, timedelta
from typing import Optional

from discord import Embed, Guild, Member, Message, NotFound, Permissions, Role, Status, TextChannel
from discord.ext import commands
from discord.ext.commands import CommandError, Context, guild_only, max_concurrency
from discord.utils import format_dt, utcnow

from PyDrocsid.async_thread import run_as_task, semaphore_gather
from PyDrocsid.cog import Cog
from PyDrocsid.command import optional_permissions, reply
from PyDrocsid.config import Contributor
from PyDrocsid.database import db, db_wrapper
from PyDrocsid.embeds import send_long_embed
from PyDrocsid.translations import t

from .models import Activity
from .permissions import InactivityPermission
from .settings import InactivitySettings
from ...pubsub import get_user_status_entries, ignore_message_edit, send_to_changelog


tg = t.g
t = t.inactivity


def status_icon(status: Status) -> str:
    return {
        Status.online: ":green_circle:",
        Status.idle: ":yellow_circle:",
        Status.dnd: ":red_circle:",
        Status.offline: ":black_circle:",
    }[status]


@run_as_task
async def scan(ctx: Context, days: int):
    async def update_msg(m: Message, content):
        embed.description = content
        embed.timestamp = utcnow()
        await ignore_message_edit(m)
        try:
            await m.edit(embed=embed)
        except NotFound:
            return await reply(ctx, embed=embed)
        return m

    embed = Embed(title=t.scanning, timestamp=utcnow())
    message: list[Message] = [await reply(ctx, embed=embed)]
    guild: Guild = ctx.guild
    members: dict[Member, datetime] = {}
    active: dict[TextChannel, int] = {}
    completed: list[TextChannel] = []

    async def update_progress_message():
        while len(completed) < len(channels):
            content = t.scanning_channel(len(completed), len(channels), cnt=len(active))
            for a, d in active.items():
                channel_age = (utcnow() - a.created_at).days
                content += f"\n:small_orange_diamond: {a.mention} ({d} / {min(channel_age, days)})"
            message[0] = await update_msg(message[0], content)
            await asyncio.sleep(2)

    async def update_members(c: TextChannel):
        active[c] = 0

        async for msg in c.history(limit=None, oldest_first=False):
            s = (utcnow() - msg.created_at).total_seconds()
            if s > days * 24 * 60 * 60:
                break
            members[msg.author] = max(members.get(msg.author, msg.created_at), msg.created_at)
            active[c] = int(s / (24 * 60 * 60))

        del active[c]
        completed.append(c)

    channels: list[TextChannel] = []
    for channel in guild.text_channels:
        permissions: Permissions = channel.permissions_for(ctx.me)
        if permissions.read_messages and permissions.read_message_history:
            channels.append(channel)

    task = asyncio.create_task(update_progress_message())
    try:
        await semaphore_gather(10, *map(update_members, channels))
    finally:
        task.cancel()

    await update_msg(message[0], t.scan_complete(cnt=len(guild.text_channels)))

    embed = Embed(title=t.updating_members)
    message: Message = await reply(ctx, embed=embed)

    await semaphore_gather(50, *[db_wrapper(Activity.update)(m.id, ts) for m, ts in members.items()])

    await update_msg(message, t.updated_members(cnt=len(members)))


class InactivityCog(Cog, name="Inactivity"):
    CONTRIBUTORS = [Contributor.Defelo]

    async def on_message(self, message: Message):
        if message.guild is None:
            return

        await Activity.update(message.author.id, message.created_at)

        role: Role
        for role in message.role_mentions:
            await Activity.update(role.id, message.created_at)

    @commands.command()
    @InactivityPermission.scan.check
    @max_concurrency(1)
    @guild_only()
    async def scan(self, ctx: Context, days: int):
        """
        scan all channels for latest message of each user
        """

        if days <= 0:
            raise CommandError(tg.invalid_duration)

        await scan(ctx, days)

    @get_user_status_entries.subscribe
    async def handle_get_user_status_entries(self, user_id) -> list[tuple[str, str]]:
        inactive_days = await InactivitySettings.inactive_days.get()

        activity: Optional[Activity] = await db.get(Activity, id=user_id)

        if activity is None:
            status = t.status.inactive
        elif (utcnow() - activity.timestamp).days >= inactive_days:
            status = t.status.inactive_since(format_dt(activity.timestamp, style="R"))
        else:
            status = t.status.active(format_dt(activity.timestamp, style="R"))

        return [(t.activity, status)]

    @commands.command(aliases=["in"])
    @InactivityPermission.read.check
    @guild_only()
    async def inactive(self, ctx: Context, days: Optional[int], *roles: Optional[Role]):
        """
        list inactive users
        """

        if role := ctx.guild.get_role(days):
            roles += (role,)
            days = None

        if days is None:
            days = await InactivitySettings.inactive_days.get()
        elif days not in range(1, 10001):
            raise CommandError(tg.invalid_duration)

        now = utcnow()

        @db_wrapper
        async def load_member(m: Member) -> tuple[Member, Optional[datetime]]:
            ts = await db.get(Activity, id=m.id)
            return m, ts.timestamp if ts else None

        if roles:
            members: set[Member] = {member for role in roles for member in role.members}
        else:
            members: set[Member] = set(ctx.guild.members)

        last_activity: list[tuple[Member, Optional[datetime]]] = await semaphore_gather(50, *map(load_member, members))
        last_activity.sort(key=lambda a: (a[1].timestamp() if a[1] else -1, str(a[0])))

        out = []
        for member, timestamp in last_activity:
            if timestamp is None:
                out.append(t.user_inactive(status_icon(member.status), member.mention, f"@{member}"))
            elif timestamp >= now - timedelta(days=days):
                break
            else:
                out.append(
                    t.user_inactive_since(
                        status_icon(member.status), member.mention, f"@{member}", format_dt(timestamp, style="R")
                    )
                )

        embed = Embed(title=t.inactive_users, colour=0x256BE6)
        if out:
            embed.title = t.inactive_users_cnt(len(out))
            embed.description = "\n".join(out)
        else:
            embed.description = t.no_inactive_users
            embed.colour = 0x03AD28
        await send_long_embed(ctx, embed, paginate=True)

    @commands.command(aliases=["indur"])
    @InactivityPermission.read.check
    @optional_permissions(InactivityPermission.write)
    @guild_only()
    async def inactive_duration(self, ctx: Context, days: Optional[int]):
        """
        configure inactivity duration
        """

        if days is None:
            days = await InactivitySettings.inactive_days.get()
            await reply(ctx, t.inactive_duration(cnt=days))
            return

        if not await InactivityPermission.write.check_permissions(ctx.author):
            raise CommandError(tg.not_allowed)

        if days not in range(1, 10001):
            raise CommandError(tg.invalid_duration)

        await InactivitySettings.inactive_days.set(days)
        await reply(ctx, t.inactive_duration_set(cnt=days))
        await send_to_changelog(ctx.guild, t.inactive_duration_set(cnt=days))