redbot/setup.py
from redbot import _early_init
# this needs to be called as early as possible
_early_init()
import asyncio
import json
import logging
import sys
import re
from copy import deepcopy
from pathlib import Path
from typing import Dict, Any, Optional, Union
import click
from redbot.core._cli import confirm
from redbot.core.utils._internal_utils import (
safe_delete,
create_backup as red_create_backup,
cli_level_to_log_level,
)
from redbot.core import config, data_manager, _drivers
from redbot.core._cli import ExitCodes
from redbot.core.data_manager import appdir, config_dir, config_file
from redbot.core._drivers import BackendType, IdentifierData
conversion_log = logging.getLogger("red.converter")
try:
config_dir.mkdir(parents=True, exist_ok=True)
except PermissionError:
print("You don't have permission to write to '{}'\nExiting...".format(config_dir))
sys.exit(ExitCodes.CONFIGURATION_ERROR)
instance_data = data_manager.load_existing_config()
if instance_data is None:
instance_list = []
else:
instance_list = list(instance_data.keys())
def save_config(name, data, remove=False):
_config = data_manager.load_existing_config()
if remove and name in _config:
_config.pop(name)
else:
_config[name] = data
with config_file.open("w", encoding="utf-8") as fs:
json.dump(_config, fs, indent=4)
def get_data_dir(*, instance_name: str, data_path: Optional[Path], interactive: bool) -> str:
if data_path is not None:
return str(data_path.resolve())
data_path = Path(appdir.user_data_dir) / "data" / instance_name
if not interactive:
return str(data_path.resolve())
print(
"We've attempted to figure out a sane default data location which is printed below."
" If you don't want to change this default please press [ENTER],"
" otherwise input your desired data location."
)
print()
print("Default: {}".format(data_path))
data_path_input = input("> ")
if data_path_input != "":
data_path = Path(data_path_input)
try:
exists = data_path.exists()
except OSError:
print(
"We were unable to check your chosen directory."
" Provided path may contain an invalid character."
)
sys.exit(ExitCodes.INVALID_CLI_USAGE)
if not exists:
try:
data_path.mkdir(parents=True, exist_ok=True)
except OSError:
print(
"We were unable to create your chosen directory."
" You may need to create the directory and set proper permissions"
" for it manually before it can be used as the data directory."
)
sys.exit(ExitCodes.INVALID_CLI_USAGE)
print("You have chosen {} to be your data directory.".format(data_path))
if not click.confirm("Please confirm", default=True):
print("Please start the process over.")
sys.exit(ExitCodes.CRITICAL)
return str(data_path.resolve())
def get_storage_type(backend: Optional[str], *, interactive: bool):
if backend:
return get_target_backend(backend)
if not interactive:
return BackendType.JSON
storage_dict = {1: BackendType.JSON, 2: BackendType.POSTGRES}
storage = None
while storage is None:
print()
print("Please choose your storage backend.")
print("1. JSON (file storage, requires no database).")
print("2. PostgreSQL (Requires a database server)")
print("If you're unsure, press [ENTER] to use the recommended default - JSON.")
storage = input("> ")
if not storage:
return BackendType.JSON
try:
storage = int(storage)
except ValueError:
storage = None
else:
if storage not in storage_dict:
storage = None
return storage_dict[storage]
def get_name(name: str) -> str:
INSTANCE_NAME_RE = re.compile(
r"""
[a-z0-9] # starts with letter or digit
(?:
(?!.*[_\.\-]{2}) # ensure no consecutive dots, hyphens, or underscores
[a-z0-9_\.\-]* # match allowed characters
[a-z0-9] # ensure string ends with letter or digit
)? # optional to allow strings of length 1
""",
re.VERBOSE | re.IGNORECASE,
)
if name:
if INSTANCE_NAME_RE.fullmatch(name) is None:
print(
"ERROR: Instance names need to start and end with a letter or a number"
" and can only include characters A-z, numbers,"
" and non-consecutive underscores (_) and periods (.)."
)
sys.exit(ExitCodes.INVALID_CLI_USAGE)
return name
while len(name) == 0:
print(
"Please enter a name for your instance,"
" it will be used to run your bot from here on out.\n"
"This name is case-sensitive, needs to start and end with a letter or a number"
" and should only include characters A-z, numbers,"
" and non-consecutive underscores (_) and periods (.)."
)
name = input("> ")
if not name:
pass
elif INSTANCE_NAME_RE.fullmatch(name) is None:
print(
"ERROR: Instance names need to start and end with a letter or a number"
" and can only include characters A-z, numbers,"
" and non-consecutive underscores (_) and periods (.)."
)
name = ""
elif "-" in name and not confirm(
"Hyphens (-) in instance names may cause issues. Are you sure you want to continue with this instance name?",
default=False,
):
name = ""
print() # new line for aesthetics
return name
def basic_setup(
*,
name: str,
data_path: Optional[Path],
backend: Optional[str],
interactive: bool,
overwrite_existing_instance: bool,
):
"""
Creates the data storage folder.
:return:
"""
if not interactive and not name:
print(
"Providing instance name through --instance-name is required"
" when using non-interactive mode."
)
sys.exit(ExitCodes.INVALID_CLI_USAGE)
if interactive:
print(
"Hello! Before we begin, we need to gather some initial information"
" for the new instance."
)
name = get_name(name)
default_data_dir = get_data_dir(
instance_name=name, data_path=data_path, interactive=interactive
)
default_dirs = deepcopy(data_manager.basic_config_default)
default_dirs["DATA_PATH"] = default_data_dir
storage_type = get_storage_type(backend, interactive=interactive)
default_dirs["STORAGE_TYPE"] = storage_type.value
driver_cls = _drivers.get_driver_class(storage_type)
default_dirs["STORAGE_DETAILS"] = driver_cls.get_config_details()
if name in instance_data:
if overwrite_existing_instance:
pass
elif interactive:
print(
"WARNING: An instance already exists with this name. "
"Continuing will overwrite the existing instance config."
)
if not click.confirm(
"Are you absolutely certain you want to continue?", default=False
):
print("Not continuing")
sys.exit(ExitCodes.SHUTDOWN)
else:
print(
"An instance with this name already exists.\n"
"If you want to remove the existing instance and replace it with this one,"
" run this command with --overwrite-existing-instance flag."
)
sys.exit(ExitCodes.INVALID_CLI_USAGE)
save_config(name, default_dirs)
if interactive:
print()
print(
f"Your basic configuration has been saved. Please run `redbot {name}` to"
" continue your setup process and to run the bot.\n\n"
"First time? Read the quickstart guide:\n"
"https://docs.discord.red/en/stable/getting_started.html"
)
else:
print("Your basic configuration has been saved.")
def get_current_backend(instance: str) -> BackendType:
return BackendType(instance_data[instance]["STORAGE_TYPE"])
def get_target_backend(backend: str) -> BackendType:
if backend == "json":
return BackendType.JSON
elif backend == "postgres":
return BackendType.POSTGRES
async def do_migration(
current_backend: BackendType, target_backend: BackendType
) -> Dict[str, Any]:
cur_driver_cls = _drivers._get_driver_class_include_old(current_backend)
new_driver_cls = _drivers.get_driver_class(target_backend)
cur_storage_details = data_manager.storage_details()
new_storage_details = new_driver_cls.get_config_details()
await cur_driver_cls.initialize(**cur_storage_details)
await new_driver_cls.initialize(**new_storage_details)
await config.migrate(cur_driver_cls, new_driver_cls)
await cur_driver_cls.teardown()
await new_driver_cls.teardown()
return new_storage_details
async def create_backup(instance: str, destination_folder: Path = Path.home()) -> None:
data_manager.load_basic_configuration(instance)
backend_type = get_current_backend(instance)
if backend_type != BackendType.JSON:
await do_migration(backend_type, BackendType.JSON)
print("Backing up the instance's data...")
driver_cls = _drivers.get_driver_class()
await driver_cls.initialize(**data_manager.storage_details())
backup_fpath = await red_create_backup(destination_folder)
await driver_cls.teardown()
if backup_fpath is not None:
print(f"A backup of {instance} has been made. It is at {backup_fpath}")
else:
print("Creating the backup failed.")
async def remove_instance(
instance: str,
interactive: bool = False,
delete_data: Optional[bool] = None,
_create_backup: Optional[bool] = None,
drop_db: Optional[bool] = None,
remove_datapath: Optional[bool] = None,
) -> None:
data_manager.load_basic_configuration(instance)
backend = get_current_backend(instance)
if interactive is True and delete_data is None:
msg = "Would you like to delete this instance's data?"
if backend != BackendType.JSON:
msg += " The database server must be running for this to work."
delete_data = click.confirm(msg, default=False)
if interactive is True and _create_backup is None:
msg = "Would you like to make a backup of the data for this instance?"
if backend != BackendType.JSON:
msg += " The database server must be running for this to work."
_create_backup = click.confirm(msg, default=False)
if _create_backup is True:
await create_backup(instance)
driver_cls = _drivers.get_driver_class(backend)
if delete_data is True:
await driver_cls.initialize(**data_manager.storage_details())
try:
await driver_cls.delete_all_data(interactive=interactive, drop_db=drop_db)
finally:
await driver_cls.teardown()
if interactive is True and remove_datapath is None:
remove_datapath = click.confirm(
"Would you like to delete the instance's entire datapath?", default=False
)
if remove_datapath is True:
data_path = data_manager.core_data_path().parent
safe_delete(data_path)
save_config(instance, {}, remove=True)
print("The instance {} has been removed.".format(instance))
async def remove_instance_interaction() -> None:
if not instance_list:
print("No instances have been set up!")
return
print(
"You have chosen to remove an instance. The following "
"is a list of instances that currently exist:\n"
)
for instance in instance_data.keys():
print("{}\n".format(instance))
print("Please select one of the above by entering its name")
selected = input("> ")
if selected not in instance_data.keys():
print("That isn't a valid instance!")
return
await remove_instance(selected, interactive=True)
@click.group(invoke_without_command=True)
@click.option(
"--debug",
"--verbose",
"-v",
count=True,
help=(
"Increase the verbosity of the logs, each usage of this flag increases the verbosity"
" level by 1."
),
)
@click.option(
"--no-prompt",
"interactive",
type=bool,
is_flag=True,
default=True,
help=(
"Don't ask for user input during the process (non-interactive mode)."
" This makes `--instance-name` required."
),
)
@click.option(
"--instance-name",
type=str,
default="",
help="Name of the new instance. Required if --no-prompt is passed.",
)
@click.option(
"--data-path",
type=click.Path(exists=False, dir_okay=True, file_okay=False, writable=True, path_type=Path),
default=None,
help=(
"Data path of the new instance. If this option and --no-prompt are omitted,"
" you will be asked for this."
),
)
@click.option(
"--backend",
type=click.Choice(["json", "postgres"]),
default=None,
help=(
"Choose a backend type for the new instance."
" If this option is omitted, you will be asked for this."
" Defaults to JSON in non-interactive mode.\n"
"Note: Choosing PostgreSQL will prevent the setup from being completely non-interactive."
),
)
@click.option(
"--overwrite-existing-instance",
type=bool,
is_flag=True,
help=(
"Confirm overwriting of existing instance.\n"
"Note: This removes *metadata* about the existing instance with that name."
),
)
@click.pass_context
def cli(
ctx: click.Context,
debug: bool,
interactive: bool,
instance_name: str,
data_path: Optional[Path],
backend: Optional[str],
overwrite_existing_instance: bool,
) -> None:
"""Create a new instance."""
level = cli_level_to_log_level(debug)
base_logger = logging.getLogger("red")
base_logger.setLevel(level)
formatter = logging.Formatter(
"[{asctime}] [{levelname}] {name}: {message}", datefmt="%Y-%m-%d %H:%M:%S", style="{"
)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter)
base_logger.addHandler(stdout_handler)
if ctx.invoked_subcommand is None:
basic_setup(
name=instance_name,
data_path=data_path,
backend=backend,
overwrite_existing_instance=overwrite_existing_instance,
interactive=interactive,
)
@cli.command()
@click.argument("instance", type=click.Choice(instance_list), metavar="<INSTANCE_NAME>")
@click.option(
"--no-prompt",
"interactive",
is_flag=True,
default=True,
help="Don't ask for user input during the process.",
)
@click.option(
"--delete-data/--no-delete-data",
"delete_data",
is_flag=True,
default=None,
help=(
"Delete this instance's data. "
"If these options and --no-prompt are omitted, you will be asked about this."
),
)
@click.option(
"--backup/--no-backup",
"_create_backup",
is_flag=True,
default=None,
help=(
"Create backup of this instance's data. "
"If these options and --no-prompt are omitted, you will be asked about this."
),
)
@click.option(
"--drop-db/--no-drop-db",
is_flag=True,
default=None,
help=(
"Drop the entire database containing this instance's data. Has no effect on JSON "
"instances, or if --no-delete-data is set. If these options and --no-prompt are omitted,"
"you will be asked about this."
),
)
@click.option(
"--remove-datapath/--no-remove-datapath",
is_flag=True,
default=None,
help=(
"Remove this entire instance's datapath. If these options and --no-prompt are omitted, "
"you will be asked about this. NOTE: --remove-datapath will override --no-delete-data "
"for JSON instances."
),
)
def delete(
instance: str,
interactive: bool,
delete_data: Optional[bool],
_create_backup: Optional[bool],
drop_db: Optional[bool],
remove_datapath: Optional[bool],
) -> None:
"""Removes an instance."""
asyncio.run(
remove_instance(
instance, interactive, delete_data, _create_backup, drop_db, remove_datapath
)
)
@cli.command()
@click.argument("instance", type=click.Choice(instance_list), metavar="<INSTANCE_NAME>")
@click.argument("backend", type=click.Choice(["json", "postgres"]))
def convert(instance: str, backend: str) -> None:
"""Convert data backend of an instance."""
current_backend = get_current_backend(instance)
target = get_target_backend(backend)
data_manager.load_basic_configuration(instance)
default_dirs = deepcopy(data_manager.basic_config_default)
default_dirs["DATA_PATH"] = str(Path(instance_data[instance]["DATA_PATH"]))
if current_backend == BackendType.MONGOV1:
raise RuntimeError("Please see the 3.2 release notes for upgrading a bot using mongo.")
else:
new_storage_details = asyncio.run(do_migration(current_backend, target))
if new_storage_details is not None:
default_dirs["STORAGE_TYPE"] = target.value
default_dirs["STORAGE_DETAILS"] = new_storage_details
save_config(instance, default_dirs)
conversion_log.info(f"Conversion to {target} complete.")
else:
conversion_log.info(
f"Cannot convert {current_backend.value} to {target.value} at this time."
)
@cli.command()
@click.argument("instance", type=click.Choice(instance_list), metavar="<INSTANCE_NAME>")
@click.argument(
"destination_folder",
type=click.Path(
dir_okay=True, file_okay=False, resolve_path=True, writable=True, path_type=Path
),
default=Path.home(),
)
def backup(instance: str, destination_folder: Path) -> None:
"""Backup instance's data."""
asyncio.run(create_backup(instance, destination_folder))
def run_cli():
# Setuptools entry point script stuff...
try:
cli() # pylint: disable=no-value-for-parameter # click
except KeyboardInterrupt:
print("Exiting...")
else:
print("Exiting...")
if __name__ == "__main__":
run_cli()