tctree333/Bird-ID

View on GitHub
bot/cogs/stats.py

Summary

Maintainability
B
6 hrs
Test Coverage
# stats.py | commands for bot statistics
# Copyright (C) 2019-2021  EraserBird, person_v1.32, hmmm

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import datetime
from io import BytesIO, StringIO
from typing import Literal

import discord
import numpy as np
import pandas as pd
from discord import app_commands
from discord.ext import commands

from bot.data import database, logger
from bot.functions import CustomCooldown, send_leaderboard, fetch_get_user


class Stats(commands.Cog):
    def __init__(self, bot):
        self.bot = bot

    @staticmethod
    def generate_series(database_key):
        """Generates a pandas.Series from a Redis sorted set."""
        logger.info("generating series")
        data = database.zrevrangebyscore(database_key, "+inf", "-inf", withscores=True)
        return pd.Series(
            {e[0]: e[1] for e in map(lambda x: (x[0].decode("utf-8"), int(x[1])), data)}
        )

    @staticmethod
    def generate_dataframe(database_keys, titles, index=None):
        """Generates a pandas.DataFrame from multiple Redis sorted sets."""
        pipe = database.pipeline()
        for key in database_keys:
            pipe.zrevrangebyscore(key, "+inf", "-inf", withscores=True)
        result = pipe.execute()
        index = tuple(
            {v[0].decode("utf-8") for r in result for v in r}.union(
                set(index if index else ())
            )
        )
        df = pd.DataFrame(
            index=tuple(index),
            data={
                title: pd.Series(
                    data={
                        e[0]: e[1]
                        for e in map(lambda x: (x[0].decode("utf-8"), int(x[1])), item)
                    },
                    index=None,
                )
                for title, item in zip(titles, result)
            },
        )
        df = df.fillna(value=0).astype(int)
        return df

    async def convert_users(self, df):
        """Converts discord user ids in DataFrames or Series indexes to usernames."""
        current_ids = df.index
        new_index = []
        for user_id in current_ids:
            user = await fetch_get_user(int(user_id), bot=self.bot, member=False)

            if user is None:
                new_index.append("User Unavailable")
            else:
                new_index.append(f"{user.name}#{user.discriminator}")
        df.index = new_index
        return df

    # give frequency stats
    @commands.hybrid_command(
        help="- Gives info on command/bird frequencies",
        usage="[command|commands|c  bird|birds|b] [page]",
        aliases=["freq"],
    )
    @commands.check(CustomCooldown(5.0, bucket=commands.BucketType.channel))
    @app_commands.describe(scope="type of frequency", page="page number")
    async def frequency(
        self,
        ctx: commands.Context,
        scope: Literal["commands", "birds", "command", "c", "bird", "b"],
        page: int = 1,
    ):
        logger.info("command: frequency")

        if scope in ("command", "commands", "c"):
            database_key = "frequency.command:global"
            title = "Most Frequently Used Commands"
        elif scope in ("bird", "birds", "b"):
            database_key = "frequency.bird:global"
            title = "Most Frequent Birds"
        else:
            await ctx.send(
                "**Invalid Scope!**\n*Valid Scopes:* `commands, birds`", ephemeral=True
            )
            return

        await send_leaderboard(ctx, title, page, database_key)

    # give bot stats
    @commands.hybrid_command(
        help="- Gives statistics on different topics",
        usage="[topic]",
        aliases=["stat"],
    )
    @commands.check(CustomCooldown(5.0, bucket=commands.BucketType.channel))
    @app_commands.describe(topic="stats on what?")
    async def stats(
        self,
        ctx: commands.Context,
        topic: Literal[
            "scores", "usage", "web", "help", "score", "s", "u", "w"
        ] = "help",
    ):
        logger.info("command: stats")

        if topic in ("scores", "score", "s"):
            topic = "scores"
        elif topic in ("usage", "u"):
            topic = "usage"
        elif topic in ("web", "w"):
            topic = "web"
        elif topic in ("help", ""):
            topic = "help"
        else:
            valid_topics = ("help", "scores", "usage", "web")
            await ctx.send(
                f"**`{topic}` is not a valid topic!**\nValid Topics: `{'`, `'.join(valid_topics)}`",
                ephemeral=True,
            )
            return

        if ctx.interaction is not None:
            await ctx.typing()

        embed = discord.Embed(
            title="Bot Stats",
            type="rich",
            color=discord.Color.blue(),
        )

        if topic == "help":
            embed.description = (
                "**Available statistic topics.**\n"
                + "This command is in progress and more stats may be added. "
                + "If there is a statistic you would like to see here, "
                + "please let us know in the support server."
            )
            embed.add_field(
                name="Scores",
                value="`b!stats [scores|score|s]`\n*Displays stats about scores.*",
            ).add_field(
                name="Usage",
                value="`b!stats [usage|u]`\n*Displays stats about usage.*",
            ).add_field(
                name="Web",
                value="`b!stats [web|w]`\n*Displays stats about web usage.*",
            )

        elif topic == "scores":
            embed.description = "**Score Statistics**"
            scores = self.generate_series("users:global")
            scores = scores[scores > 0]
            c, d = np.histogram(scores, bins=range(0, 1100, 100), range=(0, 1000))
            c = (c / len(scores) * 100).round(1)
            embed.add_field(
                name="Totals",
                inline=False,
                value="**Sum of top 10 user scores:** `{:,}`\n".format(
                    scores.nlargest(n=10).sum()
                )
                + "**Sum of all positive user scores:** `{:,}`\n".format(scores.sum()),
            ).add_field(
                name="Computations",
                inline=False,
                value="**Mean of all positive user scores:** `{:,.2f}`\n".format(
                    scores.mean()
                )
                + "**Median of all positive user scores:** `{:,.1f}`\n".format(
                    scores.median()
                ),
            ).add_field(
                name="Distributions",
                inline=False,
                value=f"**Number of users with scores over mean:** `{len(scores[scores > scores.mean()])}`\n"
                + "**Percentage of users with scores over mean:** `{:.1%}`".format(
                    len(scores[scores > scores.mean()]) / len(scores)
                )
                + "\n**Percentage of users with scores between:**\n"
                + "".join(
                    f"\u2192 *{d[i]}-{d[i+1]-1}*: `{c[i]}%`\n"  # \u2192 is the "Rightwards Arrow"
                    for i in range(len(c))
                ),
            )

        elif topic == "usage":
            embed.description = "**Usage Statistics**"

            today = datetime.datetime.now(datetime.timezone.utc).date()
            past_month = pd.date_range(  # pylint: disable=no-member
                today - datetime.timedelta(29), today
            ).date
            keys = list(f"daily.score:{str(date)}" for date in past_month)
            keys = ["users:global"] + keys
            titles = reversed(
                range(1, 32)
            )  # label columns by # days ago, today is 1 day ago
            month = self.generate_dataframe(keys, titles)
            total = month.loc[:, 31]
            month = month.loc[:, 30:1]  # remove totals column
            month = month.loc[(month != 0).any(axis=1)]  # remove users with all 0s
            week = month.loc[:, 7:1]  # generate week from month
            week = week.loc[(week != 0).any(axis=1)]
            today = week.loc[:, 1]  # generate today from week
            today = today.loc[today != 0]

            channels_see = len(list(self.bot.get_all_channels()))
            channels_used = int(database.zcard("score:global"))

            embed.add_field(
                name="Today (Since midnight UTC)",
                inline=False,
                value="**Accounts that answered at least 1 correctly:** `{:,}`\n".format(
                    len(today)
                )
                + "**Total birds answered correctly:** `{:,}`\n".format(today.sum()),
            ).add_field(
                name="Last 7 Days",
                inline=False,
                value="**Accounts that answered at least 1 correctly:** `{:,}`\n".format(
                    len(week)
                )
                + "**Total birds answered correctly:** `{:,}`\n".format(
                    week.sum().sum()
                ),
            ).add_field(
                name="Last 30 Days",
                inline=False,
                value="**Accounts that answered at least 1 correctly:** `{:,}`\n".format(
                    len(month)
                )
                + "**Total birds answered correctly:** `{:,}`\n".format(
                    month.sum().sum()
                ),
            ).add_field(
                name="Total",
                inline=False,
                value="**Channels the bot can see:** `{:,}`\n".format(channels_see)
                + "**Channels that have used the bot at least once:** `{:,} ({:,.1%})`\n".format(
                    channels_used, channels_used / channels_see
                )
                + "*(Note: Deleted channels or channels that the bot can't see anymore are still counted).*\n"
                + "**Accounts that have used any command at least once:** `{:,}`\n".format(
                    len(total)
                )
                + "**Accounts that answered at least 1 correctly:** `{:,} ({:,.1%})`\n".format(
                    len(total[total > 0]), len(total[total > 0]) / len(total)
                ),
            )

        elif topic == "web":
            embed.description = "**Web Usage Statistics**"

            today = datetime.datetime.now(datetime.timezone.utc).date()
            past_month = pd.date_range(  # pylint: disable=no-member
                today - datetime.timedelta(29), today
            ).date
            web_score = (f"daily.webscore:{str(date)}" for date in past_month)
            web_usage = (f"daily.web:{str(date)}" for date in past_month)
            titles = tuple(
                reversed(range(1, 31))
            )  # label columns by # days ago, today is 1 day ago

            web_score_month = self.generate_dataframe(web_score, titles)
            web_score_week = web_score_month.loc[:, 7:1]  # generate week from month
            web_score_week = web_score_week.loc[
                (web_score_week != 0).any(axis=1)
            ]  # remove users with no correct answers
            web_score_today = web_score_week.loc[:, 1]  # generate today from week
            web_score_today = web_score_today.loc[
                web_score_today != 0
            ]  # remove users with no correct answers

            web_usage_month = self.generate_dataframe(
                web_usage, titles, index=("check", "skip", "hint")
            )
            web_usage_week = web_usage_month.loc[:, 7:1]
            web_usage_today = web_usage_week.loc[:, 1]

            score_totals_keys = sorted(
                map(
                    lambda x: x.decode("utf-8"),
                    database.scan_iter(match="daily.webscore:????-??-??", count=5000),
                )
            )
            score_totals_titles = map(lambda x: x.split(":")[1], score_totals_keys)
            web_score_total = self.generate_dataframe(
                score_totals_keys, score_totals_titles
            )

            usage_totals_keys = sorted(
                map(
                    lambda x: x.decode("utf-8"),
                    database.scan_iter(match="daily.web:????-??-??", count=5000),
                )
            )
            usage_totals_titles = map(lambda x: x.split(":")[1], usage_totals_keys)
            web_usage_total = self.generate_dataframe(
                usage_totals_keys, usage_totals_titles, index=("check", "skip", "hint")
            )

            embed.add_field(
                name="Today (Since midnight UTC)",
                inline=False,
                value="**Accounts that answered at least 1 correctly:** `{:,}`\n".format(
                    len(web_score_today)
                )
                + "**Total birds answered correctly:** `{:,}`\n".format(
                    web_score_today.sum()
                )
                + "**Check command usage:** `{:,}`\n".format(
                    web_usage_today.loc["check"]
                )
                + "**Skip command usage:** `{:,}`\n".format(web_usage_today.loc["skip"])
                + "**Hint command usage:** `{:,}`\n".format(
                    web_usage_today.loc["hint"]
                ),
            ).add_field(
                name="Last 7 Days",
                inline=False,
                value="**Accounts that answered at least 1 correctly:** `{:,}`\n".format(
                    len(web_score_week)
                )
                + "**Total birds answered correctly:** `{:,}`\n".format(
                    web_score_week.sum().sum()
                )
                + "**Check command usage:** `{:,}`\n".format(
                    web_usage_week.loc["check"].sum()
                )
                + "**Skip command usage:** `{:,}`\n".format(
                    web_usage_week.loc["skip"].sum()
                )
                + "**Hint command usage:** `{:,}`\n".format(
                    web_usage_week.loc["hint"].sum()
                ),
            ).add_field(
                name="Last 30 Days",
                inline=False,
                value="**Accounts that answered at least 1 correctly:** `{:,}`\n".format(
                    len(web_score_month)
                )
                + "**Total birds answered correctly:** `{:,}`\n".format(
                    web_score_month.sum().sum()
                )
                + "**Check command usage:** `{:,}`\n".format(
                    web_usage_month.loc["check"].sum()
                )
                + "**Skip command usage:** `{:,}`\n".format(
                    web_usage_month.loc["skip"].sum()
                )
                + "**Hint command usage:** `{:,}`\n".format(
                    web_usage_month.loc["hint"].sum()
                ),
            ).add_field(
                name="Total",
                inline=False,
                value="**Accounts that answered at least 1 correctly:** `{:,}`\n".format(
                    len(web_score_total)
                )
                + "**Total birds answered correctly:** `{:,}`\n".format(
                    web_score_total.sum().sum()
                )
                + "**Check command usage:** `{:,}`\n".format(
                    web_usage_total.loc["check"].sum()
                )
                + "**Skip command usage:** `{:,}`\n".format(
                    web_usage_total.loc["skip"].sum()
                )
                + "**Hint command usage:** `{:,}`\n".format(
                    web_usage_total.loc["hint"].sum()
                ),
            )

        await ctx.send(embed=embed)
        return

    # export data as csv
    @commands.hybrid_command(help="- Exports bot data as a csv")
    @commands.check(CustomCooldown(60.0, bucket=commands.BucketType.channel))
    async def export(self, ctx: commands.Context):
        logger.info("command: export")

        files = []

        async def _export_helper(
            database_keys, header: str, filename: str, users=False
        ):
            if not isinstance(database_keys, str) and len(database_keys) > 1:
                data = self.generate_dataframe(
                    database_keys, header.strip().split(",")[1:]
                )
            else:
                key = (
                    database_keys
                    if isinstance(database_keys, str)
                    else database_keys[0]
                )
                data = self.generate_series(key)
            if users:
                data = await self.convert_users(data)
            with StringIO() as f:
                f.write(header)
                data.to_csv(f, header=False)
                with BytesIO(f.getvalue().encode("utf-8")) as b:
                    files.append(discord.File(b, filename))

        if ctx.interaction is not None:
            await ctx.typing()

        logger.info("exporting freq command")
        await _export_helper(
            "frequency.command:global",
            "command,amount used\n",
            "command_frequency.csv",
            users=False,
        )

        logger.info("exporting freq bird")
        await _export_helper(
            "frequency.bird:global",
            "bird,amount seen\n",
            "bird_frequency.csv",
            users=False,
        )

        logger.info("exporting streaks")
        await _export_helper(
            ["streak:global", "streak.max:global"],
            "username#discrim,current streak,max streak\n",
            "streaks.csv",
            True,
        )

        logger.info("exporting missed")
        keys = sorted(
            map(
                lambda x: x.decode("utf-8"),
                database.scan_iter(match="daily.incorrect:????-??-??", count=5000),
            )
        )
        titles = ",".join(map(lambda x: x.split(":")[1], keys))
        keys = ["incorrect:global"] + keys
        await _export_helper(
            keys, f"bird name,total missed,{titles}\n", "missed.csv", users=False
        )

        logger.info("exporting scores")
        keys = sorted(
            map(
                lambda x: x.decode("utf-8"),
                database.scan_iter(match="daily.score:????-??-??", count=5000),
            )
        )
        titles = ",".join(map(lambda x: x.split(":")[1], keys))
        keys = ["users:global"] + keys
        await _export_helper(
            keys, f"username#discrim,total score,{titles}\n", "scores.csv", users=True
        )

        logger.info("exporting web scores")
        keys = sorted(
            map(
                lambda x: x.decode("utf-8"),
                database.scan_iter(match="daily.webscore:????-??-??", count=5000),
            )
        )
        titles = ",".join(map(lambda x: x.split(":")[1], keys))
        await _export_helper(
            keys, f"username#discrim,{titles}\n", "web_scores.csv", users=True
        )

        logger.info("exporting web usage")
        keys = sorted(
            map(
                lambda x: x.decode("utf-8"),
                database.scan_iter(match="daily.web:????-??-??", count=5000),
            )
        )
        titles = ",".join(map(lambda x: x.split(":")[1], keys))
        await _export_helper(keys, f"command,{titles}\n", "web_usage.csv", users=False)

        await ctx.send(files=files)


async def setup(bot):
    await bot.add_cog(Stats(bot))