PyDrocsid/cogs

View on GitHub
integrations/adventofcode/cog.py

Summary

Maintainability
A
1 hr
Test Coverage
import os
import re
import time
from datetime import datetime, timedelta, timezone
from typing import Optional, Union

import requests
from discord import Embed, Guild, Member, Role, User
from discord.ext import commands, tasks
from discord.ext.commands import CommandError, Context, UserInputError, guild_only

from PyDrocsid.cog import Cog
from PyDrocsid.command import reply
from PyDrocsid.database import db, db_wrapper, select
from PyDrocsid.embeds import send_long_embed
from PyDrocsid.emojis import name_to_emoji
from PyDrocsid.translations import t
from PyDrocsid.util import check_role_assignable

from .colors import Colors
from .models import AOCLink
from .permissions import AdventOfCodePermission
from .settings import AdventOfCodeSettings
from ...contributor import Contributor
from ...pubsub import send_to_changelog


tg = t.g
t = t.adventofcode

BASE_URL = "https://adventofcode.com/"


class AOCConfig:
    YEAR = None
    SESSION = None
    USER_ID = None
    INVITE_CODE = None
    LEADERBOARD_URL = None
    REFRESH_INTERVAL = None

    last_update = 0
    _leaderboard = None

    update_hook = None

    @classmethod
    def load(cls) -> bool:
        cls.SESSION = os.getenv("AOC_SESSION")
        if not cls.SESSION:
            return False

        response = requests.get(BASE_URL + "leaderboard/private", cookies={"session": cls.SESSION})
        if not response.ok or not response.url.endswith("private"):
            return False

        cls.YEAR = int(response.url.split("/")[3])
        cls.INVITE_CODE, cls.USER_ID = re.search(r"<code>((\d+)-[\da-f]+)</code>", response.text).groups()
        cls.LEADERBOARD_URL = BASE_URL + f"{cls.YEAR}/leaderboard/private/view/{cls.USER_ID}.json"

        cls.REFRESH_INTERVAL = int(os.getenv("AOC_REFRESH_INTERVAL", "900"))

        return True

    @classmethod
    def _request(cls, url):
        return requests.get(url, cookies={"session": cls.SESSION})

    @classmethod
    async def get_leaderboard(cls, disable_hook: bool = False) -> dict:
        ts = time.time()
        if ts - cls.last_update >= cls.REFRESH_INTERVAL:
            cls.last_update = ts
            cls._leaderboard = cls._request(cls.LEADERBOARD_URL).json()

            members = cls._leaderboard["members"] = dict(
                sorted(
                    cls._leaderboard["members"].items(),
                    reverse=True,
                    key=lambda m: (m[1]["local_score"], m[1]["stars"], -int(m[1]["last_star_ts"])),
                )
            )
            for i, member in enumerate(members.values()):
                member["rank"] = i + 1

            if cls.update_hook and not disable_hook:
                await cls.update_hook(cls._leaderboard)

        return cls._leaderboard

    @classmethod
    async def get_member(cls, name: str) -> Optional[dict]:
        members = (await cls.get_leaderboard())["members"]

        if name in members:
            return members[name]

        for member in members.values():
            if member["name"] is not None and member["name"].lower().strip() == name.lower().strip():
                return member

        return None

    @classmethod
    async def find_member(cls, member: Union[User, Member]) -> tuple[Optional[dict], Optional[AOCLink]]:
        if link := await db.get(AOCLink, discord_id=member.id):
            return await cls.get_member(link.aoc_id), link

        if isinstance(member, Member) and member.nick:
            if result := await cls.get_member(member.nick):
                return result, None
        return await cls.get_member(member.name), None


def make_leaderboard(members: list[tuple[int, int, int, Optional[str]]]) -> str:
    rank_len, score_len, stars_len, _ = [max(len(str(e)) for e in column) for column in zip(*map(list, members))]
    score_len = max(score_len, 3)
    stars_len = max(stars_len, 5)

    out = [f" {' ' * rank_len}  {'SCORE':>{score_len + 2}}  STARS  NAME"]
    for rank, score, stars, name in members:
        out.append(f"#{rank:0{rank_len}}  [{score:{score_len}}]  {stars:{stars_len}}  {name[:50]}")

    return "```css\n" + "\n".join(out) + "\n```"


def make_member_stats(member: dict) -> tuple[int, list[str]]:
    stars = ["Day  Part #1          Part #2"]
    completed = 0
    part_avg = [[], []]
    for i in range(25):
        day = member["completion_day_level"].get(str(i + 1))
        if not day:
            continue

        line = f" {i + 1:02}"
        for part, avg in zip("12", part_avg):
            if part not in day:
                break

            completed += 1
            release_ts = datetime(AOCConfig.YEAR, 12, i + 1, 5, 0, 0, tzinfo=timezone.utc).timestamp()
            delta = timedelta(seconds=int(day[part]["get_star_ts"]) - release_ts)
            avg.append(delta.total_seconds())
            d, h, m, s = delta.days, delta.seconds // 3600, delta.seconds // 60 % 60, delta.seconds % 60
            line += f"  {d:2}d {h:2}h {m:2}m {s:2}s"
        stars.append(line)

    return completed, stars


def escape_aoc_name(name: Optional[str]) -> str:
    return "".join(c for c in name if c.isalnum() or c in " _-") if name else ""


# Alternative get facility
def get_git_repo(url: str) -> Optional[str]:
    servers = [
        (
            r"^(https?://)?gitlab.com/([a-zA-Z0-9.\-_]+)/([a-zA-Z0-9.\-_]+)(/.*)?$",
            "https://gitlab.com/api/v4/projects/{user}%2F{repo}",
            "web_url",
        ),
        (
            r"^(https?://)?gitea.com/([a-zA-Z0-9.\-_]+)/([a-zA-Z0-9.\-_]+)(/.*)?$",
            "https://gitea.com/api/v1/repos/{user}/{repo}",
            "html_url",
        ),
        (
            r"^(https?://)?github.com/([a-zA-Z0-9.\-_]+)/([a-zA-Z0-9.\-_]+)(/.*)?$",
            "https://api.github.com/repos/{user}/{repo}",
            "html_url",
        ),
    ]

    for pattern, api, web_url_key in servers:
        if not (match := re.match(pattern, url)):
            continue
        _, user, repo, path = match.groups()
        if not (response := requests.get(api.format(user=user, repo=repo))).ok:
            break
        url = response.json()[web_url_key] + (path or "")
        if not requests.head(url).ok:
            break
        return url
    return None


# Alternative parsing facility
def parse_git_url(url: str) -> tuple[str, str]:
    patterns = [
        r"^https://gitlab.com/([^/]+)/([^/]+).*",
        r"^https://gitea.com/([^/]+)/([^/]+).*",
        r"^https://github.com/([^/]+)/([^/]+).*",
    ]
    for pattern in patterns:
        match = re.match(pattern, url)
        if match is not None:
            user, repo = match.groups()
            return user, repo
    return "", ""  # TODO how handle error


class AdventOfCodeCog(Cog, name="Advent of Code Integration"):
    CONTRIBUTORS = [Contributor.Defelo]

    def __init__(self):
        super().__init__()

        AOCConfig.update_hook = self.update_roles

    @staticmethod
    def prepare() -> bool:
        return AOCConfig.load()

    async def on_ready(self):
        self.aoc_loop.cancel()
        try:
            self.aoc_loop.start()
        except RuntimeError:
            self.aoc_loop.restart()

    @tasks.loop(minutes=1)
    @db_wrapper
    async def aoc_loop(self):
        await AOCConfig.get_leaderboard()

    async def update_roles(self, leaderboard: dict):
        guild: Guild = self.bot.guilds[0]
        role: Optional[Role] = guild.get_role(await AdventOfCodeSettings.role.get())
        if not role:
            return
        rank: int = await AdventOfCodeSettings.rank.get()

        new_members: set[Member] = set()
        for member in list(leaderboard["members"].values())[:rank]:
            if member["local_score"] <= 0:
                continue
            if link := await db.get(AOCLink, aoc_id=member["id"]):
                if member := guild.get_member(link.discord_id):
                    new_members.add(member)
        old_members: set[Member] = set(role.members)

        for member in old_members - new_members:
            await member.remove_roles(role)
        for member in new_members - old_members:
            await member.add_roles(role)

    async def get_from_aoc(self, aoc_name: str) -> tuple[Optional[dict], Optional[User], Optional[AOCLink]]:
        aoc_member = await AOCConfig.get_member(aoc_name)
        if not aoc_member:
            return None, None, None

        if link := await db.get(AOCLink, aoc_id=aoc_member["id"]):
            if member := self.bot.get_user(link.discord_id):
                return aoc_member, member, link
        return aoc_member, None, None

    async def get_from_discord(
        self, member: User, ignore_link: bool
    ) -> tuple[Optional[dict], Optional[User], Optional[AOCLink]]:
        aoc_member, link = await AOCConfig.find_member(member)
        if not aoc_member:
            return None, None, None
        if link:
            return aoc_member, member, link

        if link := await db.get(AOCLink, aoc_id=aoc_member["id"]):
            if not ignore_link:
                return None, None, None

            return aoc_member, self.bot.get_user(link.discord_id), link

        return aoc_member, member, None

    @commands.group()
    async def aoc(self, ctx: Context):
        """
        Advent of Code Integration
        """

        if ctx.invoked_subcommand is None:
            raise UserInputError

    @aoc.command(name="join")
    async def aoc_join(self, ctx: Context):
        """
        request instructions on how to join the private leaderboard
        """

        await reply(
            ctx,
            embed=Embed(
                title=t.join_title, colour=Colors.AdventOfCode, description=t.join_instructions(AOCConfig.INVITE_CODE)
            ),
        )

    @aoc.command(name="leaderboard", aliases=["lb", "ranking"])
    async def aoc_leaderboard(self, ctx: Context):
        """
        show the current state of the private leaderboard
        """

        leaderboard = await AOCConfig.get_leaderboard()

        out = make_leaderboard(
            [
                (m["rank"], m["local_score"], m["stars"], escape_aoc_name(m["name"]) or f"[anonymous user #{m['id']}]")
                for i, m in enumerate(list(leaderboard["members"].values())[:20])
            ]
        )

        embed = Embed(
            title=t.leaderboard_header(AOCConfig.YEAR),
            description=out,
            color=Colors.AdventOfCode,
            timestamp=datetime.utcfromtimestamp(AOCConfig.last_update).replace(tzinfo=timezone.utc),
        )
        embed.set_footer(text=t.last_update)

        await reply(ctx, embed=embed)

    @aoc.command(name="user")
    async def aoc_user(self, ctx: Context, *, user: Optional[Union[Member, str]]):
        """
        show stats of a specific user
        """

        if isinstance(user, str):
            aoc_member, member, link = await self.get_from_aoc(user)
        else:
            aoc_member, member, link = await self.get_from_discord(user or ctx.author, user is not None)

        if not aoc_member:
            raise CommandError(tg.user_not_found)

        if aoc_member["name"]:
            name = f"{aoc_member['name']} (#{aoc_member['id']})"
        else:
            name = f"(anonymous user #{aoc_member['id']})"

        trophy = "trophy"
        rank = str(aoc_member["rank"]) + {1: "st", 2: "nd", 3: "rd"}.get(
            aoc_member["rank"] % 10 * (aoc_member["rank"] // 10 % 10 != 1), "th"
        )
        if aoc_member["rank"] <= await AdventOfCodeSettings.rank.get():
            rank = f"**{rank}**"
            trophy = "medal"
        if aoc_member["rank"] <= 3:
            trophy = ["first", "second", "third"][aoc_member["rank"] - 1] + "_place"

        completed, stars = make_member_stats(aoc_member)
        unlocked = (datetime.now(tz=timezone.utc) - datetime(AOCConfig.YEAR, 12, 1, 5, 0, 0, tzinfo=timezone.utc)).days
        unlocked = max(0, min(25, unlocked + 1)) * 2
        if not unlocked:
            progress = "N/A"
        else:
            full = "**" * (unlocked == completed)
            progress = f"{completed}/{unlocked} ({full}{completed / unlocked * 100:.1f}%{full})"

        embed = Embed(title=f"Advent of Code {AOCConfig.YEAR}", colour=Colors.AdventOfCode)
        icon_url = member.display_avatar.url if member else "https://adventofcode.com/favicon.png"
        embed.set_author(name=name, icon_url=icon_url)

        linked = f"<@{member.id}>" + " (unverified)" * (not link) if member else "Not Linked"
        embed.add_field(name=":link: Member", value=linked, inline=True)
        embed.add_field(name=":chart_with_upwards_trend: Progress", value=progress, inline=True)

        if link and link.solutions:
            user, repo = parse_git_url(link.solutions)
            embed.add_field(name=":package: Solutions", value=f"[[{user}/{repo}]]({link.solutions})", inline=True)

        embed.add_field(name=":star: Stars", value=aoc_member["stars"], inline=True)
        embed.add_field(name=f":{trophy}: Local Score", value=f"{aoc_member['local_score']} ({rank})", inline=True)
        embed.add_field(name=":globe_with_meridians: Global Score", value=aoc_member["global_score"], inline=True)

        embed.add_field(name="** **", value="```hs\n" + "\n".join(stars) + "\n```", inline=False)
        embed.set_footer(text="Last Update:")
        embed.timestamp = datetime.utcfromtimestamp(AOCConfig.last_update).replace(tzinfo=timezone.utc)

        await reply(ctx, embed=embed)

    @aoc.command(name="clear_cache", aliases=["clear", "cc"])
    @AdventOfCodePermission.clear.check
    async def aoc_clear_cache(self, ctx: Context):
        """
        clear the leaderboard cache to force a refresh on the next request
        """

        AOCConfig.last_update = 0
        await ctx.message.add_reaction(name_to_emoji["white_check_mark"])

    @aoc.group(name="link", aliases=["l"])
    @AdventOfCodePermission.link_read.check
    async def aoc_link(self, ctx: Context):
        """
        manage links between discord members and aoc users on the private leaderboard
        """

        if len(ctx.message.content.lstrip(ctx.prefix).split()) > 2:
            if ctx.invoked_subcommand is None:
                raise UserInputError
            return

        embed = Embed(title=t.links, colour=Colors.AdventOfCode)
        leaderboard = await AOCConfig.get_leaderboard()
        out = []
        async for link in await db.stream(select(AOCLink)):  # type: AOCLink
            if link.aoc_id not in leaderboard["members"]:
                continue
            if not (user := self.bot.get_user(link.discord_id)):
                continue

            member = leaderboard["members"][link.aoc_id]
            if member["name"]:
                name = f"{member['name']} (#{member['id']})"
            else:
                name = f"(anonymous user #{member['id']})"

            out.append(f"{name} = <@{link.discord_id}> (`@{user}`)")

        if not out:
            embed.description = t.no_links
            embed.colour = Colors.error
        else:
            embed.description = "\n".join(out)
        await send_long_embed(ctx, embed)

    @aoc_link.command(name="add", aliases=["a", "+"])
    @AdventOfCodePermission.link_write.check
    async def aoc_link_add(self, ctx: Context, member: Member, *, aoc_user: str):
        """
        add a new link
        """

        aoc_member = await AOCConfig.get_member(aoc_user)
        if not aoc_member:
            raise CommandError(tg.user_not_found)

        if await db.get(AOCLink, discord_id=member.id) or await db.get(AOCLink, aoc_id=aoc_member["id"]):
            raise CommandError(t.link_already_exists)

        await AOCLink.create(member.id, aoc_member["id"])
        await reply(ctx, t.link_created)

    @aoc_link.command(name="remove", aliases=["r", "del", "d", "-"])
    @AdventOfCodePermission.link_write.check
    async def aoc_link_remove(self, ctx: Context, *, member: Union[Member, str]):
        """
        remove a link
        """

        if isinstance(member, Member):
            link = await db.get(AOCLink, discord_id=member.id)
        else:
            aoc_member = await AOCConfig.get_member(member)
            link = aoc_member and await db.get(AOCLink, aoc_id=aoc_member["id"])

        if not link:
            raise CommandError(t.link_not_found)

        await db.delete(link)
        await reply(ctx, t.link_removed)

    @aoc.group(name="role", aliases=["r"])
    @AdventOfCodePermission.role_read.check
    @guild_only()
    async def aoc_role(self, ctx: Context):
        """
        manage aoc role
        """

        if len(ctx.message.content.lstrip(ctx.prefix).split()) > 2:
            if ctx.invoked_subcommand is None:
                raise UserInputError
            return

        embed = Embed(title=tg.role)

        role: Optional[Role] = ctx.guild.get_role(await AdventOfCodeSettings.role.get())
        rank: int = await AdventOfCodeSettings.rank.get()

        if not role:
            embed.colour = Colors.error
            embed.add_field(name=tg.role, value=tg.disabled)
        else:
            embed.colour = role.colour
            embed.add_field(name=tg.role, value=role.mention)
        embed.add_field(name=t.min_rank, value=str(rank))

        if role:
            embed.add_field(name=tg.members, value="\n".join(m.mention for m in role.members), inline=False)

        await send_long_embed(ctx, embed)

    @aoc_role.command(name="set", aliases=["s", "="])
    @AdventOfCodePermission.role_write.check
    async def aoc_role_set(self, ctx: Context, *, role: Role):
        """
        configure aoc role
        """

        check_role_assignable(role)

        old_role: Optional[Role] = ctx.guild.get_role(await AdventOfCodeSettings.role.get())

        await AdventOfCodeSettings.role.set(role.id)

        if old_role:
            for member in old_role.members:
                await member.remove_roles(old_role)
        await self.update_roles(await AOCConfig.get_leaderboard(disable_hook=True))

        await reply(ctx, t.role_set)
        await send_to_changelog(ctx.guild, t.log_role_set(role.name, role.id))

    @aoc_role.command(name="disable", aliases=["d", "off"])
    @AdventOfCodePermission.role_write.check
    async def aoc_role_disable(self, ctx: Context):
        """
        disable aoc role
        """

        role: Optional[Role] = ctx.guild.get_role(await AdventOfCodeSettings.role.get())

        await AdventOfCodeSettings.role.reset()

        if role:
            for member in role.members:
                await member.remove_roles(role)

        await reply(ctx, t.role_disabled)
        await send_to_changelog(ctx.guild, t.role_disabled)

    @aoc_role.command(name="rank", aliases=["r"])
    @AdventOfCodePermission.role_write.check
    async def aoc_role_rank(self, ctx: Context, rank: int):
        """
        set the minimum rank users need to get the role
        """

        if not 1 <= rank <= 200:
            raise CommandError(t.invalid_rank)

        await AdventOfCodeSettings.rank.set(rank)
        await self.update_roles(await AOCConfig.get_leaderboard(disable_hook=True))

        await reply(ctx, t.rank_set)
        await send_to_changelog(ctx.guild, t.log_rank_set(rank))

    @aoc.command(name="publish")
    async def aoc_publish(self, ctx: Context, url: str):
        """
        publish a github repository containing solutions for the current advent of code round
        """

        if not await db.get(AOCLink, discord_id=ctx.author.id):
            raise CommandError(t.not_verified)

        url: Optional[str] = get_git_repo(url)
        if not url or len(url) > 128:
            raise CommandError(t.invalid_url)

        await AOCLink.publish(ctx.author.id, url)
        await reply(ctx, t.published)

    @aoc.command(name="unpublish")
    async def aoc_unpublish(self, ctx: Context):
        """
        unpublish a previously published repository
        """

        link: Optional[AOCLink] = await db.get(AOCLink, discord_id=ctx.author.id)
        if not link:
            raise CommandError(t.not_verified)
        if not link.solutions:
            raise CommandError(t.not_published)

        await AOCLink.unpublish(ctx.author.id)
        await reply(ctx, t.unpublished)

    @aoc.command(name="solutions", aliases=["repos"])
    async def aoc_solutions(self, ctx: Context):
        """
        list published solution repositories
        """

        embed = Embed(title=t.solutions, colour=Colors.AdventOfCode)
        members = (await AOCConfig.get_leaderboard())["members"]
        out = []
        async for link in await db.stream(select(AOCLink)):  # type: AOCLink
            if not link.solutions or link.aoc_id not in members:
                continue

            user, repo = parse_git_url(link.solutions)
            out.append(f"<@{link.discord_id}> ({members[link.aoc_id]['name']}): [[{user}/{repo}]]({link.solutions})")

        if not out:
            embed.description = t.no_solutions
            embed.colour = Colors.error
        else:
            embed.description = "\n".join(out)

        await send_long_embed(ctx, embed)