cogs/owner.py
# Copyright 2019 Allan Galarza
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import os
import platform
import textwrap
import traceback
from contextlib import redirect_stdout
from distutils.version import StrictVersion
import pkg_resources
# Exposing for /debug command
from .utils.database import get_affected_count
from nabbot import NabBot
from .utils import *
from .utils import checks
from .utils.context import NabCtx
from .utils.messages import *
from .utils.errors import *
from .utils.timing import *
from .utils.pages import Pages
from .utils.errors import CannotPaginate
from .utils.tibia import *
log = logging.getLogger("nabbot")
req_pattern = re.compile(r"([\w.]+)([><=]+)([\d.]+),([><=]+)([\d.]+)")
class Owner(commands.Cog, CogUtils):
"""Commands exclusive to bot owners"""
def __init__(self, bot: NabBot):
self.bot = bot
self._last_result = None
self.sessions = set()
def cog_unload(self):
log.info(f"{self.tag} Unloading cog")
# region Commands
@commands.command(aliases=["notifyadmins"])
@checks.owner_only()
async def admins_message(self, ctx: NabCtx, *, content: str = None):
"""Sends a private message to all server owners.
Notifies all the owners of the servers where the bot is in.
If no message is specified at first, the bot will ask for a message to send.
The message contains a signature to indicate who wrote the message.
"""
if content is None:
await ctx.send("Tell me the message you want to send to server admins.\nReply `cancel/none` to cancel.")
def check(m):
return m.channel == ctx.channel and m.author == ctx.author
try:
answer = await self.bot.wait_for("message", timeout=60.0, check=check)
if answer.content.lower().strip() in ["cancel", "none"]:
await ctx.send("Nevermind then.")
return
content = answer.content
except asyncio.TimeoutError:
await ctx.send("You changed your mind then?")
return
guild_admins = list(set([g.owner for g in self.bot.guilds]))
for admin in guild_admins:
await admin.send("{0}\n\t-{1.mention}".format(content, ctx.author))
await ctx.send("Message sent to "+join_list(["@"+a.name for a in guild_admins], ", ", " and "))
@checks.owner_only()
@commands.command()
async def editmessage(self, ctx: NabCtx, message_link: str, *, new_content: str):
"""Edits a bot message with new content based on its JSON representation.
JSON based on the [Discord API](https://discordapp.com/developers/docs/resources/channel#embed-object) format.
A visualizer can be seen [here](https://leovoel.github.io/embed-visualizer/).
"""
try:
data = json.loads(self.cleanup_code(new_content))
except json.JSONDecodeError:
return await ctx.error("New content is not a valid json string.")
if not isinstance(data, dict):
return await ctx.error("New content is not a valid json string.")
content = data.get("content")
embed = None
if "embed" in data:
if "timestamp" in data["embed"] and isinstance(data["embed"]["timestamp"], str):
data["embed"]["timestamp"] = data["embed"]["timestamp"].replace('Z', '')
embed = discord.Embed.from_dict(data["embed"])
guild_id, channel_id, message_id = parse_message_link(message_link)
if channel_id is None or message_id is None:
return await ctx.error("That's not a valid message link.")
if guild_id is None:
return await ctx.error("I can't edit private messages.")
guild: discord.Guild = ctx.bot.get_guild(guild_id)
if guild is None:
return await ctx.error("I'm not in the guild the message belongs to.")
channel: discord.TextChannel = guild.get_channel(channel_id)
if channel_id is None:
return await ctx.error("I can't find the channel the message belongs to.")
try:
message: discord.Message = await channel.fetch_message(message_id)
except discord.HTTPException:
return await ctx.error("I couldn't find the message.")
if message.author != ctx.me:
return await ctx.error("I can only edit my own messages.")
try:
await message.edit(content=content, embed=embed)
await ctx.message.add_reaction("✅")
except discord.HTTPException:
return await ctx.error("I could edit the message. Maybe the content is malformed or exceeds limits.")
@checks.owner_only()
@commands.command()
async def announcement(self, ctx: NabCtx, *, message):
"""Sends an announcement to all servers with a serverlog."""
embed = discord.Embed(title="📣 Owner Announcement", colour=discord.Colour.blurple(),
timestamp=dt.datetime.now())
embed.set_author(name="Support Server", url="https://discord.gg/NmDvhpY", icon_url=self.bot.user.avatar_url)
embed.set_footer(text=f"By {ctx.author}", icon_url=get_user_avatar(ctx.author))
embed.description = message
msg = await ctx.send("This message will be sent to all serverlogs. Do you want to send it?", embed=embed)
confirm = await ctx.react_confirm(msg, delete_after=True)
if not confirm:
await ctx.send("Ok then.")
return
count = 0
msg = await ctx.send(f"{config.loading_emoji} Sending messages...")
for guild in self.bot.guilds:
success = await self.bot.send_log_message(guild, embed=embed)
if success:
count += 1
await safe_delete_message(msg)
await ctx.success(f"Message sent to {count:,} servers.")
# noinspection PyBroadException
@checks.owner_only()
@commands.command(name="eval")
async def _eval(self, ctx: NabCtx, *, body: str):
"""Evaluates Python code.
This commands lets you evaluate python code. If no errors are returned, the bot will react to the command call.
To show the result, you have to use `print()`.
Asynchronous functions must be waited for using `await`.
To show the results of the last command, use `print(_)`.
"""
if "os." in body:
await ctx.send("I won't run that.")
return
env = {
"bot": self.bot,
"ctx": ctx,
"channel": ctx.channel,
"author": ctx.author,
"server": ctx.guild,
"guild": ctx.guild,
"message": ctx.message,
"_": self._last_result
}
env.update(globals())
body = self.cleanup_code(body)
stdout = io.StringIO()
to_compile = f"async def func():\n{textwrap.indent(body, ' ')}"
try:
exec(to_compile, env)
except Exception as e:
return await ctx.send(f'```py\n{e.__class__.__name__}: {e}\n```')
func = env["func"]
try:
with redirect_stdout(stdout):
start = time.perf_counter()
ret = await func()
run_time = time.perf_counter()-start
except Exception:
value = stdout.getvalue()
await ctx.send(f'```py\n{value}{traceback.format_exc()}\n```')
else:
value = stdout.getvalue()
try:
await ctx.message.add_reaction(config.true_emoji.replace("<", "").replace(">", ""))
except discord.HTTPException:
pass
embed = discord.Embed(colour=discord.Colour.teal())
embed.set_footer(text=f"Executed in {run_time*1000:,.4f} ms")
embed.set_author(name=ctx.author.name, icon_url=get_user_avatar(ctx.author))
if ret is not None:
self._last_result = ret
if ret is None and value:
embed.title = "Output"
embed.description = f'```py\n{value}\n```'
elif ret and value:
embed.title = "Output"
embed.description = f'```py\n{value}\n```'
embed.add_field(name=f"Result (Type: {type(ret).__name__})", value=f'```py\n{ret}\n```', inline=False)
elif ret and not value:
embed.title = f"Result (Type: {type(ret).__name__})"
embed.description = f'```py\n{ret}\n```'
else:
return
await ctx.send(embed=embed)
@checks.owner_only()
@commands.command(name="invalidworlds")
async def invalid_worlds(self, ctx: NabCtx):
"""Checks if there are any characters in invalid worlds or servers tracking invalid worlds.
They can be fixed by using the merge command to rename them to their corresponding new name."""
async with ctx.pool.acquire() as conn:
invalid = defaultdict(lambda: {"servers": 0, "characters": 0})
# Count servers tracking other worlds
rows = await conn.fetch("SELECT count(*), value as world FROM server_property "
"WHERE key = 'world' AND NOT value = ANY($1) "
"GROUP BY 2", tibia_worlds)
for row in rows:
invalid[row["world"]]["servers"] = row["count"]
# Count characters in other worlds
rows = await conn.fetch('SELECT count(*), world FROM "character" '
'WHERE NOT world = ANY($1) '
'GROUP BY 2', tibia_worlds)
for row in rows:
invalid[row["world"]]["characters"] = row["count"]
entries = [f"**{k}** - {v['servers']} servers, {v['characters']} characters" for k, v in invalid.items()]
pages = Pages(ctx, entries=entries, per_page=10)
pages.embed.title = f"Invalid worlds"
try:
await pages.paginate()
except CannotPaginate as e:
await ctx.error(e)
@commands.command()
@checks.owner_only()
async def leave(self, ctx: NabCtx, *, server: str):
"""Makes the bot leave a server.
The bot will ask for confirmation before leaving the server.
Once the bot has left a server, only a server administrator can add it back.
"""
id_regex = re.compile(r'([0-9]{15,21})$')
match = id_regex.match(server)
if match:
guild = self.bot.get_guild(int(match.group(1)))
if guild is None:
await ctx.error(f"I'm not in any server with the id {server}.")
return
else:
guild = self.bot.get_guild_by_name(server)
if guild is None:
await ctx.error(f"I'm not in any server named {server}")
return
embed = discord.Embed(title=guild.name, timestamp=guild.created_at)
embed.set_footer(text="Created")
embed.set_author(name=guild.owner.name, icon_url=get_user_avatar(guild.owner))
embed.set_thumbnail(url=guild.icon_url)
embed.add_field(name="Members", value=str(guild.member_count))
embed.add_field(name="Joined", value=str(guild.me.joined_at))
message = await ctx.send("Are you sure you want me to leave this server?", embed=embed)
confirm = await ctx.react_confirm(message)
if confirm is None:
await ctx.send("Forget it then.")
return
if confirm is False:
await ctx.send("Ok, I will stay there.")
return
try:
await guild.leave()
await ctx.success(f"I just left the server **{guild.name}**.")
except discord.HTTPException as e:
log.warning(f"{self.tag} Could not leave server: {e}")
await ctx.error("Something went wrong, I guess they don't want to let me go.")
@commands.command(name="load")
@checks.owner_only()
async def load_cog(self, ctx: NabCtx, cog: str):
"""Loads a cog.
If there's an error while compiling, it will be displayed here.
Any cog can be loaded here, including cogs made by user.
When loading and unloading cogs in subdirectories, periods (`.`) are used instead of slashes (`/`).
For example, a cog found in `cogs/tibia.py` would be loaded as `cogs.tibia`.
"""
try:
self.bot.load_extension(cog)
await ctx.send(f"{ctx.tick()} Cog loaded successfully.")
except Exception as e:
await ctx.send('{}: {}'.format(type(e).__name__, e))
@commands.command(name="logs")
@checks.owner_only()
async def logs(self, ctx: NabCtx, log_name: str = None):
base_dir = "logs"
if log_name is None:
def file_size(size):
if size < 1024:
return f"{size:,} B"
size /= 1024
if size < 1024:
return f"{size:,.2f} kB"
size /= 1024
if size < 1024:
return f"{size:,.2f} mB"
entries = []
for log_file in os.listdir(base_dir):
path = os.path.join(base_dir, log_file)
if os.path.isfile(path):
entries.append(f"{log_file} (*{file_size(os.path.getsize(path))}*)")
entries[1:] = sorted(entries[1:], reverse=True)
pages = Pages(ctx, entries=entries, per_page=10)
pages.embed.title = f"Log files"
try:
await pages.paginate()
except CannotPaginate as e:
await ctx.error(e)
return
if log_name and ctx.guild:
return await ctx.error("For security reasons, I can only upload logs on private channels.")
if ".." in log_name:
return await ctx.error("You're not allowed to get files from outside the log folder.")
try:
with open(os.path.join("logs", log_name), "rb") as f:
await ctx.send("Here's your log file", file=discord.File(f, log_name))
except FileNotFoundError:
return await ctx.error("There's no log file with that name.")
except discord.HTTPException:
return await ctx.error("Error uploading file. It is currently not possible to read the current log file.")
@commands.command(usage="<old world> <new world>")
@checks.owner_only()
async def merge(self, ctx: NabCtx, old_world: str, new_world: str):
"""Renames all references of an old world to a new one.
This command should updates all the database entries, changing all references of the old world to the new one
This updates all characters' worlds and discord guild's tracked worlds to the new world.
All the highscores entries of the old world will be deleted.
This should be done immediately after the world merge occurs and not before, or else tracking will stop.
Use this with caution as the damage can be irreversible.
Example: `merge Fidera Gladera`
"""
old_world = old_world.capitalize()
new_world = new_world.capitalize()
message = await ctx.send(f"Are you sure you want to merge **{old_world}** into **{new_world}**?\n"
f"*This will affect all the Discord servers I'm in, and may be irreversible.*")
confirm = await ctx.react_confirm(message)
if confirm is None:
await ctx.send("You took too long!")
return
if not confirm:
await ctx.send("Good, I hate doing that.")
return
async with ctx.pool.acquire() as conn:
result = await conn.execute('UPDATE "character" SET world = $1 WHERE world = $2', new_world, old_world)
affected_chars = get_affected_count(result)
result = await conn.execute("UPDATE server_property SET VALUE = $1 WHERE key = 'world' AND value = $2",
new_world, old_world)
affected_guilds = get_affected_count(result)
await conn.execute("DELETE FROM highscores WHERE world = $1", old_world)
await ctx.send(f"Moved **{affected_chars:,}** characters to {new_world}. "
f"**{affected_guilds}** discord servers were affected.\n\n"
f"Enjoy **{new_world}**! 🔥♋")
await self.bot.reload_worlds()
@commands.command(aliases=["namechange", "rename"], usage="<old name>,<new name>")
@checks.owner_only()
@checks.not_lite_only()
@commands.guild_only()
async def namelock(self, ctx: NabCtx, *, params):
"""Register the name of a new character that was namelocked.
Characters that get namelocked can't be searched by their old name, so they must be reassigned manually.
If the character got a name change (from the store), searching the old name redirects to the new name, so
these are usually reassigned automatically.
In order for the command to work, the following conditions must be met:
- The old name must exist in NabBot's characters database.
- The old name must not be a valid character in Tibia.com
- The new name must be a valid character in Tibia.com
- They must have the same vocation, not considering promotions.
"""
params = params.split(",")
if len(params) != 2:
await ctx.send("The correct syntax is: `/namelock oldname,newname")
return
old_name = params[0]
new_name = params[1]
with ctx.typing():
old_char_db = await DbChar.get_by_name(ctx.pool, old_name)
# If character wasn't registered, there's nothing to do.
if old_char_db is None:
await ctx.error(f"I don't have a character registered with the name: **{old_name}**")
return
# Search old name to see if there's a result
try:
old_char = await get_character(ctx.bot, old_name)
except NetworkError:
await ctx.error("I'm having problem with 'the internet' as you humans say, try again.")
return
# Check if returns a result
if old_char is not None:
if old_name.lower() == old_char.name.lower():
await ctx.error(f"The character **{old_char.name}** wasn't namelocked.")
else:
await ctx.success(f"The character **{old_name}** was renamed to **{old_char.name}**.")
# Renaming is actually done in get_character(), no need to do anything.
return
# Check if new name exists
try:
new_char = await get_character(ctx.bot, new_name)
if new_char is None:
await ctx.error(f"The character **{new_name}** doesn't exist.")
return
except NetworkError:
await ctx.error("I'm having problem with 'the internet' as you humans say, try again.")
return
# Check if vocations are similar
if not (old_char_db.vocation.lower() in new_char.vocation.value.lower()
or new_char.vocation.value.lower() in old_char_db.vocation.lower()):
await ctx.error(f"**{old_char_db.name}** was a *{old_char_db.vocation}* and "
f"**{new_char.name}** is a *{new_char.vocation.value}*. "
f"I think you're making a mistake.")
return
await ctx.send(f"Are you sure **{old_char_db.name}** ({abs(old_char_db.level)} {old_char_db.vocation}) is"
f" **{new_char.name}** ({new_char.level} {new_char.vocation}) now? `yes/no`")
def check(m):
return m.channel == ctx.channel and m.author == ctx.author
try:
reply = await self.bot.wait_for("message", timeout=50.0, check=check)
if reply.content.lower() not in ["yes", "y"]:
await ctx.send("No then? Alright.")
return
except asyncio.TimeoutError:
await ctx.send("No answer? I guess you changed your mind.")
return
# Check if new name was already registered
new_char_db = await DbChar.get_by_name(ctx.pool, new_char.name)
async with ctx.pool.acquire() as conn:
if new_char_db is None:
await old_char_db.update_level(conn, new_char.level)
await old_char_db.update_name(conn, new_char.name)
await old_char_db.update_vocation(conn, new_char.vocation.value)
else:
# Replace new char with old char id and delete old char, reassign deaths and levelups
# TODO: Handle conflicts, specially in deaths
await conn.execute('DELETE FROM "character" WHERE id = $1', old_char_db.id)
await conn.execute('UPDATE "character" SET id = $1 WHERE id = $2',
old_char_db.id, new_char_db.id)
await conn.execute("UPDATE character_death SET id = $1 WHERE id = $2",
old_char_db.id, new_char_db.id)
await conn.execute("UPDATE character_levelup SET id = $1 WHERE id = $2",
old_char_db.id, new_char_db.id)
await ctx.success("Character renamed successfully.")
@checks.owner_only()
@commands.command()
async def messagejson(self, ctx: NabCtx, message_link: str):
"""Shows the json representation of a message.
JSON based on the [Discord API](https://discordapp.com/developers/docs/resources/channel#embed-object) format.
A visualizer can be seen [here](https://leovoel.github.io/embed-visualizer/).
"""
guild_id, channel_id, message_id = parse_message_link(message_link)
if channel_id is None or message_id is None:
return await ctx.error("That's not a valid message link.")
if guild_id is None:
return await ctx.error("I can't check private messages.")
guild: discord.Guild = ctx.bot.get_guild(guild_id)
if guild is None:
return await ctx.error("I'm not in the guild the message belongs to.")
channel: discord.TextChannel = guild.get_channel(channel_id)
if channel_id is None:
return await ctx.error("I can't find the channel the message belongs to.")
try:
message: discord.Message = await channel.fetch_message(message_id)
except discord.HTTPException:
return await ctx.error("I couldn't find the message.")
data = dict()
if message.content:
data["content"] = message.content
if message.embeds:
# We only care about the first, regular embed.
embed: discord.Embed = message.embeds[0]
if embed.type == "rich":
data["embed"] = embed.to_dict()
del data["embed"]["type"]
await ctx.send(f"```json\n{json.dumps(data, indent=1)}```")
@checks.owner_only()
@commands.command()
async def ping(self, ctx: NabCtx):
"""Shows the bot's response times."""
resp = await ctx.send('Pong! Loading...')
diff = resp.created_at - ctx.message.created_at
await resp.edit(content=f'Pong! That took {1000*diff.total_seconds():.1f}ms.\n'
f'Socket latency is {1000*self.bot.latency:.1f}ms')
@checks.owner_only()
@commands.command()
async def sendmessage(self, ctx: NabCtx, channel: Optional[discord.TextChannel] = None, *, json_content: str):
"""Sends a message based on its JSON representation.
JSON based on the [Discord API](https://discordapp.com/developers/docs/resources/channel#embed-object) format.
A visualizer can be seen [here](https://leovoel.github.io/embed-visualizer/)."""
if channel is None:
channel = ctx.channel
try:
data = json.loads(self.cleanup_code(json_content))
except json.JSONDecodeError:
return await ctx.error("Content is not a valid json string.")
if not isinstance(data, dict):
return await ctx.error("Content is not a valid json string.")
content = data.get("content")
embed = None
if "embed" in data:
if "timestamp" in data["embed"] and isinstance(data["embed"]["timestamp"], str):
data["embed"]["timestamp"] = data["embed"]["timestamp"].replace('Z', '')
embed = discord.Embed.from_dict(data["embed"])
try:
await channel.send(content, embed=embed)
await ctx.message.add_reaction("✅")
except discord.Forbidden:
await ctx.error("I don't have the right permissions to send a message there.")
except discord.HTTPException:
await ctx.error("I couldn't send your message, the content might be malformed or exceed limits.")
@checks.owner_only()
@commands.command()
async def purge(self, ctx: NabCtx):
"""Cleans the database from entries of servers that no longer contain NabBot."""
guilds = [g.id for g in ctx.bot.guilds]
msg = await ctx.send("This action will delete configuration of all servers I'm not in.\n"
"This action is irreversible. Make sure you're connected to the correct database and "
f"the correct bot instance. I'm currently in **{len(guilds)}** servers.\n"
"**Are you sure you want to continue?**")
confirm = await ctx.react_confirm(msg, delete_after=True)
if confirm is not True:
await ctx.send("Database purge cancelled.")
return
output = ""
async with ctx.typing():
async with ctx.pool.acquire() as conn: # type: asyncpg.Connection
result = await conn.execute("DELETE FROM server_property WHERE NOT (server_id = ANY($1))", guilds)
output += f"Deleted {get_affected_count(result)} server property rows.\n"
result = await conn.execute("DELETE FROM server_prefixes WHERE NOT (server_id = ANY($1))", guilds)
output += f"Deleted {get_affected_count(result)} server prefixes rows.\n"
result = await conn.execute("DELETE FROM server_timezone WHERE NOT (server_id = ANY($1))", guilds)
output += f"Deleted {get_affected_count(result)} server timezones rows.\n"
result = await conn.execute("DELETE FROM ignored_entry WHERE NOT (server_id = ANY($1))", guilds)
output += f"Deleted {get_affected_count(result)} ignored entries.\n"
result = await conn.execute("DELETE FROM role_auto WHERE NOT (server_id = ANY($1))", guilds)
output += f"Deleted {get_affected_count(result)} automatic roles.\n"
result = await conn.execute("DELETE FROM role_joinable WHERE NOT (server_id = ANY($1))", guilds)
output += f"Deleted {get_affected_count(result)} joinable roles.\n"
result = await conn.execute("DELETE FROM watchlist WHERE NOT (server_id = ANY($1))", guilds)
output += f"Deleted {get_affected_count(result)} watchlists and their entries."
await ctx.success(output)
@checks.owner_only()
@commands.command(name="reload")
async def reload_cog(self, ctx: NabCtx, *, cog):
"""Reloads a cog (module)"""
# noinspection PyBroadException
try:
self.bot.unload_extension(cog)
self.bot.load_extension(cog)
except ModuleNotFoundError:
await ctx.error("Cog not found.")
except Exception:
await ctx.send(f'```py\n{traceback.format_exc()}\n```')
else:
await ctx.success(f"Cog reloaded successfully.")
@checks.owner_only()
@commands.command(name="reloadconfig")
async def reload_config(self, ctx: NabCtx):
"""Reloads the configuration file."""
try:
config.parse()
await ctx.success(f"Config file reloaded.")
except Exception:
await ctx.send(f'```py\n{traceback.format_exc()}\n```')
@commands.command(hidden=True)
@checks.owner_only()
async def repl(self, ctx: NabCtx):
"""Starts a REPL session in the current channel.
Similar to `eval`, but this keeps a running sesion where variables and results are stored.```.
"""
variables = {
"ctx": ctx,
"bot": self.bot,
"message": ctx.message,
"server": ctx.guild,
"guild": ctx.guild,
"channel": ctx.channel,
"author": ctx.author,
"_": None
}
variables.update(globals())
if ctx.channel.id in self.sessions:
await ctx.send('Already running a REPL session in this channel. Exit it with `quit`.')
return
self.sessions.add(ctx.channel.id)
await ctx.send('Enter code to execute or evaluate. `exit()` or `quit` to exit.')
while True:
def check(m):
return m.content.startswith('`') and m.author == ctx.author and m.channel == ctx.channel
try:
response = await self.bot.wait_for("message", check=check, timeout=10.0*60.0)
except asyncio.TimeoutError:
await ctx.send('Exiting REPL session.')
self.sessions.remove(ctx.channel.id)
break
cleaned = self.cleanup_code(response.content)
if cleaned in ('quit', 'exit', 'exit()'):
await ctx.send('Exiting.')
self.sessions.remove(ctx.channel.id)
return
executor = exec
if cleaned.count('\n') == 0:
# single statement, potentially 'eval'
try:
code = compile(cleaned, '<repl session>', 'eval')
except SyntaxError:
pass
else:
executor = eval
if executor is exec:
try:
code = compile(cleaned, '<repl session>', 'exec')
except SyntaxError as e:
await ctx.send(self.get_syntax_error(e))
continue
variables['message'] = response
fmt = None
stdout = io.StringIO()
try:
with redirect_stdout(stdout):
result = executor(code, variables)
if inspect.isawaitable(result):
result = await result
except Exception:
value = stdout.getvalue()
fmt = f'```py\n{value}{traceback.format_exc()}\n```'
else:
value = stdout.getvalue()
if result is not None:
fmt = f'```py\n{value}{result}\n```'
variables['_'] = result
elif value:
fmt = f'```py\n{value}\n```'
try:
if fmt is not None:
if len(fmt) > 2000:
await ctx.send("Content too big to be printed.")
else:
await ctx.send(fmt)
except discord.Forbidden:
pass
except discord.HTTPException as e:
await ctx.send(f'Unexpected error: `{e}`')
@commands.command()
@checks.owner_only()
async def shutdown(self, ctx: NabCtx):
"""Shutdowns the bot."""
await ctx.send('Shutting down...')
await self.bot.logout()
@commands.command()
@checks.owner_only()
async def sql(self, ctx: NabCtx, *, query: str):
"""Executes a SQL query and shows the results.
If the results are too long to display, a text file is generated and uploaded."""
query = self.cleanup_code(query)
async with ctx.pool.acquire() as conn:
try:
start = time.perf_counter()
results = await conn.fetch(query)
delta = (time.perf_counter() - start) * 1000.0
except asyncpg.PostgresError as e:
return await ctx.send(f'```py\n{e.__class__.__name__}: {e}\n```')
rows = len(results)
if rows == 0:
return await ctx.send(f'`{delta:.2f}ms: {results}`')
headers = list(results[0].keys())
table = TabularData()
table.set_columns(headers)
table.add_rows(list(r.values()) for r in results)
render = table.render()
fmt = f'```\n{render}\n```\n*Returned {rows} rows in {delta:2f}ms*'
if len(fmt) > 2000:
fp = io.BytesIO(fmt.encode('utf-8'))
await ctx.send('Too many results to display here', file=discord.File(fp, 'results.txt'))
else:
await ctx.send(fmt)
@checks.owner_only()
@checks.can_embed()
@commands.command()
async def servers(self, ctx: NabCtx, sort=None):
"""Shows a list of servers the bot is in.
Further information can be obtained using `serverinfo [id]`.
Values can be sorted by using one of the following values for sort:
- name
- members
- world
- created
- joined"""
entries = []
sorters = {
"name": (lambda g: g.name, False, lambda g: self.bot.tracked_worlds.get(g.id, 'None')),
"members": (lambda g: len(g.members), True, lambda g: f"{len(g.members):,} users"),
"world": (lambda g: self.bot.tracked_worlds.get(g.id, "|"), False,
lambda g: self.bot.tracked_worlds.get(g.id, 'None')),
"created": (lambda g: g.created_at, False, lambda g: f"Created: {g.created_at.date()}"),
"joined": (lambda g: g.me.joined_at, False, lambda g: f"Joined: {g.me.joined_at.date()}")
}
if sort is None:
sort = "name"
if sort not in sorters:
return await ctx.error(f"Invalid sort value. Valid values are: `{', '.join(sorters)}`")
guilds = sorted(self.bot.guilds, key=sorters[sort][0], reverse=sorters[sort][1])
for guild in guilds:
entries.append(f"**{guild.name}** (ID: **{guild.id}**) - {sorters[sort][2](guild)}")
pages = Pages(ctx, entries=entries, per_page=10)
pages.embed.title = f"Servers with {ctx.me.name}"
try:
await pages.paginate()
except CannotPaginate as e:
await ctx.error(e)
@commands.command(name="unload")
@checks.owner_only()
async def unload_cog(self, ctx: NabCtx, cog: str):
"""Unloads a cog."""
try:
self.bot.unload_extension(cog)
await ctx.success("Cog unloaded successfully.")
except Exception as e:
await ctx.error('{}: {}'.format(type(e).__name__, e))
@commands.command()
@checks.owner_only()
async def versions(self, ctx: NabCtx):
"""Shows version info about NabBot and its dependencies.
An X is displayed if the minimum required version is not met, this is likely to cause problems.
A warning sign is displayed when the version installed exceeds the highest version supported
This means there might be breaking changes, causing the bot to malfunction. This is not always the case.
A checkmark indicates that the dependency is inside the recommended range."""
def comp(operator, object1, object2):
if operator == ">=":
return object1 >= object2
elif operator == ">":
return object1 > object2
elif operator == "==":
return object1 == object2
elif operator == "<":
return object1 < object2
elif operator == "<=":
return object1 <= object2
embed = discord.Embed(title="NabBot", description="v"+self.bot.__version__)
embed.set_footer(text=f"Python v{platform.python_version()} on {platform.platform()}",
icon_url="https://www.python.org/static/apple-touch-icon-precomposed.png")
try:
with open("./requirements.txt") as f:
requirements = f.read()
except FileNotFoundError:
embed.add_field(name="Error", value="`requirements.txt` wasn't found in NabBot's root directory.")
await ctx.send(embed=embed)
return
dependencies = req_pattern.findall(requirements)
for package in dependencies:
version = pkg_resources.get_distribution(package[0]).version
if not comp(package[1], StrictVersion(version), StrictVersion(package[2])):
value = f"{ctx.tick(False)}v{version}\n`At least v{package[2]} expected`"
elif not comp(package[3], StrictVersion(version), StrictVersion(package[4])):
value = f"{config.warn_emoji}v{version}\n`Only below v{package[4]} tested`"
else:
value = f"{ctx.tick(True)}v{version}"
embed.add_field(name=package[0], value=value)
await ctx.send(embed=embed)
# endregion
# region Auxiliary functions
@staticmethod
def cleanup_code(content):
"""Automatically removes code blocks from the code."""
# remove ```py\n```
if content.startswith('```') and content.endswith('```'):
return '\n'.join(content.split('\n')[1:-1])
# remove `foo`
return content.strip('` \n')
@staticmethod
def get_syntax_error(e):
if e.text is None:
return '```py\n{0.__class__.__name__}: {0}\n```'.format(e)
return '```py\n{0.text}{1:>{0.offset}}\n{2}: {0}```'.format(e, '^', type(e).__name__)
# endregion
def setup(bot):
bot.add_cog(Owner(bot))