bot/cogs/score.py
# score.py | commands to show score related things
# Copyright (C) 2019-2021 EraserBird, person_v1.32, hmmm
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import datetime
from typing import Literal, Optional, Union
import discord
import pandas as pd
from discord import app_commands
from discord.ext import commands
from discord.utils import escape_markdown as esc
from bot.data import GenericError, database, logger
from bot.functions import CustomCooldown, send_leaderboard, fetch_get_user
class Score(commands.Cog):
def __init__(self, bot):
self.bot = bot
@staticmethod
def _server_total(ctx: commands.Context):
logger.info("fetching server totals")
channels = map(
lambda x: x.decode("utf-8"),
database.smembers(f"channels:{ctx.guild.id}"),
)
pipe = database.pipeline() # use a pipeline to get all the scores
for channel in channels:
pipe.zscore("score:global", channel)
scores = pipe.execute()
return int(sum(map(lambda x: x or 0, scores)))
@staticmethod
def _monthly_lb(category):
logger.info("generating monthly leaderboard")
if category == "scores":
key = "daily.score"
elif category == "missed":
key = "daily.incorrect"
else:
raise GenericError("Invalid category", 990)
today = datetime.datetime.now(datetime.timezone.utc).date()
past_month = pd.date_range( # pylint: disable=no-member
today - datetime.timedelta(29), today
).date
pipe = database.pipeline()
for day in past_month:
pipe.zrevrangebyscore(f"{key}:{day}", "+inf", "-inf", withscores=True)
result = pipe.execute()
totals = pd.Series(dtype="int64")
for daily_score in result:
daily_score = pd.Series(
{
e[0]: e[1]
for e in map(
lambda x: (x[0].decode("utf-8"), int(x[1])), daily_score
)
}
)
totals = totals.add(daily_score, fill_value=0)
totals = totals.sort_values(ascending=False)
return totals
@staticmethod
def _server_lb(guild_id):
logger.info("generating server leaderboard")
users = tuple(
map(
lambda x: x.decode("utf8"),
database.smembers(f"users.server.id:{guild_id}"),
)
)
pipe = database.pipeline()
for user in users:
pipe.zscore("users:global", user)
scores = map(int, pipe.execute())
score_series = pd.Series(scores, index=users, dtype="int64").sort_values(
ascending=False
)
return score_series
@staticmethod
async def user_lb(ctx, title, page, database_key=None, data=None):
if database_key is None and data is None:
raise GenericError("database_key and data are both NoneType", 990)
if database_key is not None and data is not None:
raise GenericError("database_key and data are both set", 990)
page = max(1, page)
user_amount = (
int(database.zcard(database_key))
if database_key is not None
else data.count()
)
page = (page * 10) - 10
if user_amount == 0:
logger.info(f"no users in {database_key}")
await ctx.send("There are no users in the database.")
return
if page >= user_amount:
page = user_amount - (user_amount % 10 if user_amount % 10 != 0 else 10)
users_per_page = 10
leaderboard_list = (
database.zrevrangebyscore(
database_key, "+inf", "-inf", page, users_per_page, True
)
if database_key is not None
else data.iloc[page : page + users_per_page - 1].items()
)
embed = discord.Embed(type="rich", colour=discord.Color.blurple())
embed.set_author(name="Bird ID - An Ornithology Bot")
leaderboard = []
for i, stats in enumerate(leaderboard_list):
if ctx.guild is not None:
user = await fetch_get_user(int(stats[0]), ctx=ctx, member=True)
else:
user = None
if user is None:
user = await fetch_get_user(int(stats[0]), ctx=ctx, member=False)
if user is None:
user_info = "**Deleted**"
else:
user_info = f"**{esc(user.name)}#{user.discriminator}**"
else:
user_info = (
f"**{esc(user.name)}#{user.discriminator}** ({user.mention})"
)
leaderboard.append(f"{i+1+page}. {user_info} - {int(stats[1])}\n")
embed.add_field(name=title, value="".join(leaderboard), inline=False)
user_score = (
database.zscore(database_key, str(ctx.author.id))
if database_key is not None
else data.get(str(ctx.author.id))
)
if user_score is not None:
if database_key is not None:
placement = int(database.zrevrank(database_key, str(ctx.author.id))) + 1
distance = int(
database.zrevrange(
database_key, placement - 2, placement - 2, True
)[0][1]
) - int(user_score)
else:
placement = int(data.rank(ascending=False)[str(ctx.author.id)])
distance = int(data.iloc[placement - 2] - user_score)
if placement == 1:
embed.add_field(
name="You:",
value=f"You are #{placement} on the leaderboard.\nYou are in first place.",
inline=False,
)
elif distance == 0:
embed.add_field(
name="You:",
value=f"You are #{placement} on the leaderboard.\nYou are tied with #{placement-1}",
inline=False,
)
else:
embed.add_field(
name="You:",
value=f"You are #{placement} on the leaderboard.\nYou are {distance} away from #{placement-1}",
inline=False,
)
else:
embed.add_field(name="You:", value="You haven't answered any correctly.")
await ctx.send(embed=embed)
# returns total number of correct answers so far
@commands.hybrid_command(
brief="- Total correct answers in a channel or server",
help="- Total correct answers in a channel or server. Defaults to channel.",
usage="[server|s]",
)
@commands.check(CustomCooldown(8.0, bucket=commands.BucketType.channel))
@app_commands.describe(scope="server or channel")
async def score(
self,
ctx: commands.Context,
scope: Literal["server", "channel", "s", "c"] = "channel",
):
logger.info("command: score")
if scope in ("server", "s"):
total_correct = self._server_total(ctx)
await ctx.send(
f"Wow, looks like a total of `{total_correct}` birds have been answered correctly in this **server**!\n"
+ "Good job everyone!"
)
else:
total_correct = int(database.zscore("score:global", str(ctx.channel.id)))
await ctx.send(
f"Wow, looks like a total of `{total_correct}` birds have been answered correctly in this **channel**!\n"
+ "Good job everyone!"
)
# sends correct answers by a user
@commands.hybrid_command(
brief="- How many correct answers given by a user",
help="- Gives the amount of correct answers by a user.\n"
+ "Mention someone to get their score, don't mention anyone to get your score.",
aliases=["us"],
)
@commands.check(CustomCooldown(5.0, bucket=commands.BucketType.user))
@app_commands.describe(user="The user to check scores for")
async def userscore(
self,
ctx: commands.Context,
*,
user: Optional[Union[discord.Member, discord.User]] = None,
):
logger.info("command: userscore")
if user is not None:
if isinstance(user, str):
await ctx.send("Not a user!")
return
usera = user.id
logger.info(usera)
score = database.zscore("users:global", str(usera))
if score is not None:
score = int(score)
user = f"<@{usera}>"
else:
await ctx.send(
"This user does not exist on our records!", ephemeral=True
)
return
else:
user = f"<@{ctx.author.id}>"
score = int(database.zscore("users:global", str(ctx.author.id)))
embed = discord.Embed(type="rich", colour=discord.Color.blurple())
embed.set_author(name="Bird ID - An Ornithology Bot")
embed.add_field(
name="User Score:", value=f"{user} has answered correctly {score} times."
)
await ctx.send(embed=embed)
# gives streak of a user
@commands.hybrid_group(
help="- Gives your current/max streak",
usage="[user]",
aliases=["streaks", "stk"],
fallback="self",
)
@commands.check(CustomCooldown(5.0, bucket=commands.BucketType.user))
async def streak(self, ctx: commands.Context):
if ctx.invoked_subcommand is not None:
return
logger.info("command: streak")
args = " ".join(ctx.message.content.split(" ")[1:])
if args:
user = await commands.MemberConverter().convert(ctx, args)
else:
user = None
if user is not None:
if isinstance(user, str):
await ctx.send("Not a user!")
return
usera = user.id
logger.info(usera)
streak = database.zscore("streak:global", str(usera))
max_streak = database.zscore("streak.max:global", str(usera))
if streak is not None and max_streak is not None:
streak = int(streak)
max_streak = int(max_streak)
user = f"<@{usera}>"
else:
await ctx.send(
"This user does not exist on our records!", ephemeral=True
)
return
else:
user = f"<@{ctx.author.id}>"
streak = int(database.zscore("streak:global", str(ctx.author.id)))
max_streak = int(database.zscore("streak.max:global", str(ctx.author.id)))
embed = discord.Embed(
type="rich", colour=discord.Color.blurple(), title="**User Streaks**"
)
embed.set_author(name="Bird ID - An Ornithology Bot")
current_streak = f"{user} has answered `{streak}` in a row!"
max_streak = f"{user}'s max was `{max_streak}` in a row!"
embed.add_field(name="**Current Streak**", value=current_streak, inline=False)
embed.add_field(name="**Max Streak**", value=max_streak, inline=False)
await ctx.send(embed=embed)
# streak leaderboard - returns top streaks
@streak.command(
brief="- Top streaks",
help="- Top streaks, either current (default) or max.",
usage="[max|m] [page]",
name="leaderboard",
aliases=["lb", "top"],
)
@commands.check(CustomCooldown(5.0, bucket=commands.BucketType.user))
@app_commands.describe(scope="current or max streaks", page="The page to view")
async def streak_leaderboard(
self,
ctx: commands.Context,
scope: Literal["current", "max", "now", "c", "m"] = "current",
page: int = 1,
):
logger.info("command: leaderboard")
try:
page = int(scope)
except ValueError:
if scope == "":
scope = "current"
scope = scope.lower()
else:
scope = "current"
logger.info(f"scope: {scope}")
logger.info(f"page: {page}")
if not scope in ("current", "now", "c", "max", "m"):
logger.info("invalid scope")
await ctx.send(
f"**{scope} is not a valid scope!**\n*Valid Scopes:* `max`",
ephemeral=True,
)
return
if scope in ("max", "m"):
database_key = "streak.max:global"
scope = "max"
else:
database_key = "streak:global"
scope = "current"
if ctx.interaction is not None:
await ctx.typing()
await self.user_lb(
ctx, f"Streak Leaderboard ({scope})", page, database_key, None
)
# leaderboard - returns top 1-10 users
@commands.hybrid_command(
brief="- Top scores",
help="- Top scores, either global, server, or monthly.",
usage="[global|g server|s month|monthly|m] [page]",
aliases=["lb"],
)
@commands.check(CustomCooldown(5.0, bucket=commands.BucketType.user))
@app_commands.describe(
scope="global, server, or monthly scores", page="The page to view"
)
async def leaderboard(
self,
ctx: commands.Context,
scope: Literal[
"global", "server", "monthly", "month", "m", "g", "s"
] = "global",
page: int = 1,
):
logger.info("command: leaderboard")
try:
page = int(scope)
except ValueError:
if scope == "":
scope = "global"
scope = scope.lower()
else:
scope = "global"
logger.info(f"scope: {scope}")
logger.info(f"page: {page}")
if not scope in ("global", "server", "month", "monthly", "m", "g", "s"):
logger.info("invalid scope")
await ctx.send(
f"**{scope} is not a valid scope!**\n*Valid Scopes:* `global, server, month`",
ephemeral=True,
)
return
if ctx.interaction is not None:
await ctx.typing()
if scope in ("server", "s"):
if ctx.guild is not None:
database_key = None
data = self._server_lb(ctx.guild.id)
scope = "server"
else:
logger.info("dm context")
await ctx.send(
"**Server scopes are not available in DMs.**\n*Showing global leaderboard instead.*"
)
scope = "global"
database_key = "users:global"
data = None
elif scope in ("month", "monthly", "m"):
database_key = None
scope = "Last 30 Days"
data = self._monthly_lb("scores")
else:
database_key = "users:global"
scope = "global"
data = None
await self.user_lb(ctx, f"Leaderboard ({scope})", page, database_key, data)
# missed - returns top 1-10 missed birds
@commands.hybrid_command(
brief="- Top incorrect birds",
help="- Top incorrect birds, either global, server, personal, or monthly.",
usage="[global|g server|s me|m month|monthly|mo] [page]",
aliases=["m"],
)
@commands.check(CustomCooldown(5.0, bucket=commands.BucketType.user))
@app_commands.describe(
scope="global, server, personal, or monthly", page="The page to view"
)
async def missed(
self,
ctx: commands.Context,
scope: Literal[
"global", "server", "me", "monthly", "month", "mo", "g", "s", "m"
] = "global",
page: int = 1,
):
logger.info("command: missed")
try:
page = int(scope)
except ValueError:
if scope == "":
scope = "global"
scope = scope.lower()
else:
scope = "global"
logger.info(f"scope: {scope}")
logger.info(f"page: {page}")
if not scope in (
"global",
"server",
"me",
"month",
"monthly",
"mo",
"g",
"s",
"m",
):
logger.info("invalid scope")
await ctx.send(
f"**{scope} is not a valid scope!**\n*Valid Scopes:* `global, server, me, month`",
ephemeral=True,
)
return
if ctx.interaction is not None:
await ctx.typing()
if scope in ("server", "s"):
data = None
if ctx.guild is not None:
database_key = f"incorrect.server:{ctx.guild.id}"
scope = "server"
else:
logger.info("dm context")
await ctx.send(
"**Server scopes are not available in DMs.**\n*Showing global leaderboard instead.*"
)
scope = "global"
database_key = "incorrect:global"
elif scope in ("me", "m"):
data = None
database_key = f"incorrect.user:{ctx.author.id}"
scope = "me"
elif scope in ("month", "monthly", "mo"):
data = self._monthly_lb("missed")
database_key = None
scope = "Last 30 days"
else:
data = None
database_key = "incorrect:global"
scope = "global"
await send_leaderboard(
ctx, f"Top Missed Birds ({scope})", page, database_key, data
)
async def setup(bot):
await bot.add_cog(Score(bot))