AngellusMortis/game_server_manager

View on GitHub
gs_manager/servers/base.py

Summary

Maintainability
D
2 days
Test Coverage
import getpass
import logging
import os
import signal
import tarfile
import time
from datetime import datetime
from distutils.dir_util import copy_tree
from shutil import copyfile, rmtree
from subprocess import DEVNULL, PIPE, STDOUT, CalledProcessError  # nosec
from typing import Callable, Dict, Iterable, List, Optional, Type, Union

import click
import psutil
from pygtail import Pygtail

from gs_manager.command import DEFAULT_CONFIG, Config, ServerCommandClass
from gs_manager.command.validators import (
    DirectoryConfigType,
    GenericConfigType,
    ServerDirectoryType,
)
from gs_manager.decorators import multi_instance, require, single_instance
from gs_manager.logger import get_logger
from gs_manager.null import NullServer
from gs_manager.utils import get_server_path, run_command

__all__ = [
    "EmptyServer",
    "BaseServer",
    "BaseServerConfig",
    "STATUS_SUCCESS",
    "STATUS_FAILED",
    "STATUS_PARTIAL_FAIL",
]

STATUS_SUCCESS = 0
STATUS_FAILED = 1
STATUS_PARTIAL_FAIL = 2


class BaseServerConfig(Config):
    multi_instance: bool = False

    _validators: Dict[str, List[GenericConfigType]] = {
        **Config._validators,
        **{
            "backup_directory": [ServerDirectoryType],
            "backup_location": [DirectoryConfigType],
        },
    }

    _excluded_properties: List[str] = Config._excluded_properties + [
        "multi_instance",
    ]

    name: str = "game_server"
    user: str = getpass.getuser()
    server_log: Optional[str] = None

    # start command config
    wait_start: int = 3
    max_start: int = 60
    spawn_process: bool = False
    start_command: str = None
    start_directory: str = ""

    # stop command config
    max_stop: int = 30
    pre_stop: int = 30
    stop_command: str = None

    # save command config
    save_command: str = None

    # say command config
    say_command: str = None

    # backup options
    backup_directory: str = ""
    backup_location: Optional[str] = None
    backup_days: int = 7
    backup_extra_paths: Optional[List[str]] = []

    @property
    def global_options(self):
        return {
            "all": [
                {
                    "param_decls": ("-n", "--name"),
                    "type": str,
                    "help": (
                        "Name of gameserver service, should be unique "
                        "across all gameservers to prevent ID conflicts. "
                        "Instance names will be appended to global name"
                    ),
                },
                {
                    "param_decls": ("-u", "--user"),
                    "type": str,
                    "help": ("User to run the game server as"),
                },
                {
                    "param_decls": ("-l", "--server-log"),
                    "type": str,
                    "help": ("Path to server log"),
                },
            ],
            "instance_enabled": [
                {
                    "param_decls": ("-i", "--current-instance"),
                    "type": str,
                    "help": "Current instance to run commands against.",
                },
                {
                    "param_decls": ("-p", "--parallel"),
                    "is_flag": True,
                    "help": "Used in conjuntion with -ci @all to run all "
                    "subcommands in parallel",
                },
            ],
        }


class EmptyServer(NullServer):
    """ Empty game server with no commands"""

    name: str = "empty"

    _config: Config

    config_class: Optional[Type[Config]] = None
    _logger: Optional[logging.getLoggerClass()] = None

    def __init__(self, config: Config):
        self._config = config

    @property
    def config(self) -> Config:
        self._config.update_config(self.context)
        return self._config

    @property
    def context(self) -> click.Context:
        return click.get_current_context()

    @property
    def logger(self) -> logging.getLoggerClass():
        if self._logger is None:
            self._logger = get_logger()
        return self._logger


class BaseServer(EmptyServer):
    """ Simple game server with common core commands"""

    name: str = "base"
    supports_multi_instance: bool = False

    config_class: Optional[Type[Config]] = BaseServerConfig
    _config: BaseServerConfig

    @property
    def config(self) -> BaseServerConfig:
        return super().config.current_instance

    def set_instance(
        self, instance_name: str, multi_instance: bool = False
    ) -> None:
        self._config.instance_name = instance_name
        self._config.multi_instance = multi_instance

    @property
    def server_name(self) -> str:
        if self.config.parent is None:
            return self.config.name
        return f"{self.config.parent.name}_{self.config.name}"

    @property
    def backup_name(self):
        if self.config.parent is None:
            return self.config.name
        return self.config.parent.name

    def _get_pid_filename(self) -> str:
        if self.config.parent is None:
            return ".pid_file"
        return f".pid_file_{self.config.name}"

    def _get_pid_file_path(self) -> str:
        return get_server_path(self._get_pid_filename())

    def _read_pid_file(self) -> Optional[int]:
        pid = None
        pid_file = self._get_pid_file_path()
        if os.path.isfile(pid_file):
            with open(pid_file, "r") as f:
                try:
                    pid = int(f.read().strip())
                    self.logger.debug("read pid: {}".format(pid))
                except ValueError:
                    pass
        return pid

    def _write_pid_file(self, pid: int) -> None:
        self.logger.debug("write pid: {}".format(pid))
        if pid is not None:
            pid_file = self._get_pid_file_path()
            with open(pid_file, "w") as f:
                f.write(str(pid))

    def _delete_pid_file(self) -> None:
        pid_file = self._get_pid_file_path()
        if os.path.isfile(pid_file):
            os.remove(pid_file)

    def _startup_check(self) -> int:
        self.logger.info("")

        def _wait_callback():
            if self.is_running() and self.is_accessible():
                return True

        self._wait(
            self.config.max_start,
            callback=_wait_callback,
            label="timeout",
            show_percent=False,
        )
        if self.is_running():
            if self.is_accessible():
                self.logger.success(f"\n{self.server_name} is running")
                return STATUS_SUCCESS
            else:
                self.logger.error(
                    f"{self.server_name} is running but not accesible"
                )
                return STATUS_PARTIAL_FAIL
        else:
            self.logger.error(f"could not start {self.server_name}")
            return STATUS_FAILED

    def _find_pid(self, require: bool = True) -> None:
        command = (
            self.config.start_command.replace('"', '\\"')
            .replace("?", "\\?")
            .replace("+", "\\+")
            .replace("|", ".")
            .strip()
        )

        pids = self.run_command(
            "ps -ef --sort=start_time | "
            f'grep -i -P "(?<!grep -i |-c ){command}$" | '
            "awk '{{print $2}}'"
        ).split("\n")

        self.logger.debug(f"pids: {pids}")

        for pid in pids:
            if pid is not None and not pid == "":
                self.run_command(f"ps -ef | grep {pid}")

        if pids[0] is None or pids[0] == "":
            if require:
                raise click.ClickException("could not determine PID")
        else:
            self._write_pid_file(pids[0])

    def _wait(
        self,
        seconds: int,
        callback: Optional[Callable] = None,
        label: Optional[str] = None,
        show_eta: bool = True,
        show_percent: bool = True,
    ) -> None:
        with click.progressbar(
            length=seconds,
            label=label,
            show_eta=show_eta,
            show_percent=show_percent,
        ) as waiter:
            for item in waiter:
                if callback is not None:
                    if callback():
                        break
                time.sleep(1)

    def _prestop(
        self, seconds: int, verb: str = "shutting down", reason: str = ""
    ) -> bool:
        if self._command_exists("say_command"):
            if reason != "":
                reason = f" {reason}"

            if seconds < 60:
                time = f"{seconds} seconds"
            else:
                minutes = seconds / 60
                seconds = seconds % 60
                time = f"{minutes} minutes and {seconds} seconds"

            message = f"Server is {verb} in {time}...{reason}"

            self.invoke(
                self.command,
                command_string=self.config.say_command.format(message),
                do_print=False,
            )
            return True
        return False

    def _stop(self, pid: Optional[int] = None) -> None:
        stopped = False
        if self._command_exists("stop_command"):
            if self._command_exists("save_command"):
                self.invoke(
                    self.command,
                    command_string=self.config.save_command,
                    do_print=False,
                )

            response = self.invoke(
                self.command,
                command_string=self.config.stop_command,
                do_print=False,
            )
            if response == STATUS_SUCCESS:
                stopped = True
            else:
                self.logger.debug("stop command failed, sending kill signal")

        if not stopped:
            if pid is None:
                pid = self.get_pid()
            if pid is not None:
                os.kill(pid, signal.SIGINT)

    def _command_exists(self, command: str) -> bool:
        return (
            hasattr(self, "command")
            and isinstance(self.command, click.Command)
            and hasattr(self.config, command)
            and getattr(self.config, command) is not None
        )

    def get_pid(self) -> int:
        return self._read_pid_file()

    def _is_running_single(self, delete_pid: bool = True) -> bool:
        pid = self.get_pid()
        if pid is not None:
            try:
                psutil.Process(pid)
            except psutil.NoSuchProcess:
                if delete_pid:
                    self._delete_pid_file()
            else:
                return True
        return False

    def is_running(
        self, check_all: bool = False, delete_pid: bool = True
    ) -> Union[bool, List[bool]]:
        """ checks if gameserver is running """

        if check_all and len(self.config.all_instance_names) > 0:
            current_instance = self.config.instance_name
            multi_instance = self.config.multi_instance

            is_running = []
            for instance_name in self.config.all_instance_names:
                self.set_instance(instance_name, True)
                is_running.append(self._is_running_single())

            self.set_instance(current_instance, multi_instance)
            return is_running

        return self._is_running_single()

    def is_accessible(self) -> bool:
        return self.is_running(delete_pid=False)

    def run_command(self, command: str, **kwargs) -> str:
        """ runs command with debug logging """

        self.logger.debug(f"run command ({self.config.user}: '{command}'")
        try:
            output = run_command(command, **kwargs)
        except Exception as ex:
            self.logger.debug("command exception: {}:{}".format(type(ex), ex))
            raise ex
        self.logger.debug("command output:")
        self.logger.debug(output)

        return output

    def invoke(self, method: Callable, *args, **kwargs) -> int:
        if "current_instance" not in kwargs and self.config.parent is not None:
            current_instance = self.config.parent.instance_name
            multi_instance = self.config.parent.multi_instance
            self.set_instance(None, False)
            kwargs["current_instance"] = current_instance
            response = self.context.invoke(method, *args, **kwargs)
            self.set_instance(current_instance, multi_instance)
            return response

        return self.context.invoke(method, *args, **kwargs)

    def kill_server(self) -> None:
        """ forcibly kills server process """

        pid = self.get_pid()
        if pid is not None:
            os.kill(pid, signal.SIGKILL)

    def delete_offset(self):
        offset_file = get_server_path(".log_offset")
        if os.path.isfile(offset_file):
            os.remove(offset_file)

    def tail_file(self, remove_offset: bool = True) -> Iterable:
        log_file = get_server_path(self.config.server_log)
        offset_file = get_server_path(".log_offset")
        if remove_offset:
            self.delete_offset()

        return Pygtail(log_file, offset_file=offset_file)

    @multi_instance
    @click.command(cls=ServerCommandClass)
    @click.pass_obj
    def print_config(self, *args, **kwargs) -> int:
        """ Debug tool to just print out your server config """

        config_dict = self.config.__dict__
        if "instance_overrides" in config_dict:
            config_dict.pop("instance_overrides")

        self.logger.info(f"Config for {self.server_name}")
        self.logger.info(config_dict)

        return STATUS_SUCCESS

    @require("start_command")
    @multi_instance
    @click.command(cls=ServerCommandClass)
    @click.pass_obj
    def status(self, *args, **kwargs) -> int:
        """ checks if gameserver is running or not """

        if not self.is_running():
            self._find_pid(False)

        if self.is_running():
            if self.is_accessible():
                self.logger.success(f"{self.server_name} is running")
                return STATUS_SUCCESS
            else:
                self.logger.error(
                    f"{self.server_name} is running, but is not accessible"
                )
                return STATUS_PARTIAL_FAIL
        else:
            self.logger.warning(f"{self.server_name} is not running")
            return STATUS_FAILED

    @require("start_command")
    @multi_instance
    @click.command(cls=ServerCommandClass)
    @click.option(
        "--no-verify",
        is_flag=True,
        help="Do not wait until gameserver is running before exiting",
    )
    @click.option(
        "-w",
        "--wait-start",
        type=int,
        help=(
            "Time (in seconds) to wait after running the command "
            "before checking the server"
        ),
    )
    @click.option(
        "-m",
        "--max-start",
        type=int,
        help=(
            "Max time (in seconds) to wait before assuming the "
            "server is deadlocked"
        ),
    )
    @click.option(
        "--spawn-process",
        is_flag=True,
        help=(
            "Spawn a new process in the background detached from the "
            "main process"
        ),
    )
    @click.option(
        "-f",
        "--foreground",
        is_flag=True,
        help=(
            "Start gameserver in foreground. Ignores "
            "spawn_process, screen, and any other "
            "options or classes that cause server to run "
            "in background."
        ),
    )
    @click.option(
        "--start-directory",
        type=str,
        help=("Directory to run the start command in relative to server_path"),
    )
    @click.option("--start-command", type=str, help="Start up command")
    @click.pass_obj
    def start(
        self,
        no_verify: bool,
        foreground: bool,
        start_command: Optional[str] = None,
        *args,
        **kwargs,
    ) -> int:
        """ starts gameserver """

        if self.is_running():
            self.logger.warning(f"{self.server_name} is already running")
            return STATUS_PARTIAL_FAIL

        self._delete_pid_file()
        self.logger.info(f"starting {self.server_name}...", nl=False)

        command = start_command or self.config.start_command
        popen_kwargs = {}
        if self.config.spawn_process and not foreground:
            log_file_path = get_server_path(
                ["logs", f"{self.backup_name}.log"]
            )

            command = f"nohup {command}"
            popen_kwargs = {
                "return_process": True,
                "redirect_output": False,
                "stdin": DEVNULL,
                "stderr": STDOUT,
                "stdout": PIPE,
                "start_new_session": True,
            }
        elif foreground:
            popen_kwargs = {
                "redirect_output": False,
            }

        try:
            response = self.run_command(
                command,
                cwd=get_server_path(self.config.start_directory),
                **popen_kwargs,
            )
        except CalledProcessError:
            self.logger.error("unexpected error from server")

        if foreground:
            return

        if self.config.spawn_process:
            self.run_command(
                f"cat > {log_file_path}",
                return_process=True,
                redirect_output=False,
                stdin=response.stdout,
                stderr=DEVNULL,
                stdout=DEVNULL,
            )

        if self.config.wait_start > 0:
            time.sleep(self.config.wait_start)

        self._find_pid()
        if no_verify:
            return STATUS_SUCCESS
        return self._startup_check()

    @multi_instance
    @click.command(cls=ServerCommandClass)
    @click.option(
        "-f",
        "--force",
        is_flag=True,
        help=(
            "Force kill the server. WARNING: server will not have "
            "chance to save"
        ),
    )
    @click.option(
        "--max-stop",
        type=int,
        help="Max time (in seconds) to wait for server to stop",
    )
    @click.option(
        "--pre-stop",
        type=int,
        help=(
            "Time (in seconds) before stopping the server to "
            "allow notifing users."
        ),
    )
    @click.option(
        "-r",
        "--reason",
        type=str,
        help="Reason the server is stopping",
        default="",
    )
    @click.option("-v", "--verb", type=str, help="Shutdown verb", default="")
    @click.pass_obj
    def stop(
        self, force: bool, reason: str, verb: str, *args, **kwargs
    ) -> int:
        """ stops gameserver """

        if verb == "":
            if force:
                verb = "killing"
            else:
                verb = "shutting down"

        if self.is_running():
            if self.config.pre_stop > 0 and not force:
                if self._prestop(self.config.pre_stop, verb, reason):
                    self.logger.info("notifiying users...")
                    self._wait(self.config.pre_stop)

            self.logger.info(f"{verb} {self.server_name}...")

            if force:
                self.kill_server()
                time.sleep(1)
            else:
                self._stop()

                def _wait_callback():
                    if not self.is_running():
                        return True

                self._wait(
                    self.config.max_stop,
                    callback=_wait_callback,
                    label="timeout",
                    show_percent=False,
                )

            if self.is_running():
                self.logger.error(f"could not stop {self.server_name}")
                return STATUS_PARTIAL_FAIL
            else:
                self.logger.success(f"{self.server_name} was stopped")
                return STATUS_SUCCESS
        else:
            self.logger.warning(f"{self.server_name} is not running")
            return STATUS_FAILED

    @multi_instance
    @click.command(cls=ServerCommandClass)
    @click.option(
        "-f",
        "--force",
        is_flag=True,
        help=(
            "Force kill the server. WARNING: server will not have "
            "chance to save"
        ),
    )
    @click.option(
        "--no-verify",
        is_flag=True,
        help="Do not wait until gameserver is running before exiting",
    )
    @click.option(
        "-r",
        "--reason",
        type=str,
        help="Reason the server is restarting",
        default="",
    )
    @click.pass_obj
    def restart(
        self, force: bool, no_verify: bool, reason: str, *args, **kwargs
    ):
        """ restarts gameserver"""

        if self.is_running():
            self.invoke(
                self.stop, force=force, verb="restarting", reason=reason
            )
        return self.invoke(self.start, no_verify=no_verify, foreground=False)

    @single_instance
    @click.command(cls=ServerCommandClass)
    @click.option(
        "-f",
        "--force",
        is_flag=True,
        help="Edit file even though server is running",
    )
    @click.argument("edit_path", type=click.Path())
    @click.pass_obj
    def edit(self, force: bool, edit_path: str, *args, **kwargs) -> int:
        """ edits a server file with your default editor """

        if not force and self.is_running():
            self.logger.warning(f"{self.server_name} is still running")
            return STATUS_PARTIAL_FAIL

        file_path = get_server_path(edit_path)
        editor = os.environ.get("EDITOR") or "vim"

        self.run_command(
            f"{editor} {file_path}", redirect_output=False,
        )
        return STATUS_SUCCESS

    @multi_instance
    @click.command(cls=ServerCommandClass)
    @click.option(
        "-f", "--follow", is_flag=True, help="Follow log file",
    )
    @click.option(
        "-n",
        "--num",
        default=20,
        type=int,
        help="Number of lines to list, use -1 to list all",
    )
    @click.pass_obj
    def tail(self, follow: bool, num: int, *args, **kwargs) -> int:
        """ edits a server file with your default editor """

        self.delete_offset()

        while True:
            tail = self.tail_file(remove_offset=False)
            lines = tail.readlines()
            if num > 0:
                lines = lines[-num:]

            for line in lines:
                self.logger.info(line.strip())

            if not follow:
                break

            time.sleep(1)

        return STATUS_SUCCESS

    @require("backup_directory")
    @require("backup_location")
    @single_instance
    @click.command(cls=ServerCommandClass)
    @click.option(
        "--backup-location",
        type=click.Path(),
        help="Location to store backup files",
    )
    @click.option(
        "--backup-directory",
        type=click.Path(),
        help="Directory realtive to server-path to backup",
    )
    @click.option(
        "--backup-days",
        type=int,
        help="Number of days worth of backups to keep",
    )
    @click.pass_obj
    def backup(self, *args, **kwargs) -> int:
        """ edits a server file with your default editor """

        backup_folder = os.path.join(self.config.backup_location, "backups")
        timestamp = (
            datetime.now().isoformat(timespec="minutes").replace(":", "-")
        )
        backup_file = f"{self.backup_name}_{timestamp}.tar.gz"

        os.makedirs(backup_folder, exist_ok=True)

        if self._command_exists("save_command"):
            self.logger.info(f"Saving servers...")
            current_instance = self.config.instance_name
            multi_instance = self.config.multi_instance
            self.set_instance(None, False)
            self.invoke(
                self.command,
                command_string=self.config.save_command,
                do_print=False,
                parallel=True,
                current_instance="@all",
            )
            self.set_instance(current_instance, multi_instance)

        self.logger.info(f"Making server backup ({backup_file})...")
        with tarfile.open(
            os.path.join(backup_folder, backup_file), "w:gz"
        ) as tar:
            tar.add(
                get_server_path(self.config.backup_directory),
                arcname=self.config.backup_directory,
            )
            tar.add(self.config.config_path, arcname=DEFAULT_CONFIG)
            for path in self.config.backup_extra_paths:
                if os.path.exists(path):
                    tar.add(path, os.path.basename(path))
                else:
                    self.logger.warning(f"{path} does not exist")

        old_backups = []
        now = time.time()
        for backup in os.listdir(backup_folder):
            abs_path = os.path.join(backup_folder, backup)
            if os.stat(abs_path).st_mtime < now - 7 * 86400:
                old_backups.append(abs_path)

        if len(old_backups) > 0:
            self.logger.info(f"Deleting {len(old_backups)} old backups...")

            for backup in old_backups:
                os.remove(backup)

        return STATUS_SUCCESS

    @require("backup_directory")
    @require("backup_location")
    @single_instance
    @click.command(cls=ServerCommandClass)
    @click.option(
        "--backup-location",
        type=click.Path(),
        help="Location to store backup files",
    )
    @click.option(
        "--backup-directory",
        type=click.Path(),
        help="Directory realtive to server-path to backup",
    )
    @click.option("--list-backups", is_flag=True, help="List backup files")
    @click.option(
        "--num",
        type=int,
        default=10,
        help="Number of backups to list. Use -1 to list all",
    )
    @click.argument("backup_num", default=0, type=int)
    @click.pass_obj
    def restore(
        self, list_backups: bool, num: int, backup_num: int, *args, **kwargs
    ) -> int:
        """ edits a server file with your default editor """

        backup_folder = os.path.join(self.config.backup_location, "backups")
        restore_folder = os.path.join(self.config.backup_location, "restore")
        backups = []

        if os.path.isdir(backup_folder):
            for backup in os.listdir(backup_folder):
                if backup.startswith(self.backup_name):
                    backups.append(backup)
            backups = sorted(backups)

        if list_backups:
            if num >= 0:
                backups = backups[:num]

            for index, backup in enumerate(backups):
                self.logger.info(f"{index:2}: {backup}")
            return STATUS_SUCCESS

        if backup_num > len(backups):
            self.logger.error(f"Backup {backup_num} does not exist")
            return STATUS_FAILED

        if self.is_running():
            self.logger.error(f"{self.server_name} is still running")
            return STATUS_FAILED

        self.logger.info("Cleaning up previous restore...")
        for old_backup in os.listdir(self.config.backup_location):
            if old_backup.endswith(".tar.gz"):
                os.remove(
                    os.path.join(self.config.backup_location, old_backup)
                )

        if os.path.isdir(restore_folder):
            rmtree(restore_folder)
        os.mkdir(restore_folder)

        self.logger.info("Extacting backup...")
        backup_file = os.path.join(
            self.config.backup_location, backups[backup_num]
        )
        copyfile(os.path.join(backup_folder, backups[backup_num]), backup_file)

        with tarfile.open(backup_file) as tar:
            tar.extractall(path=restore_folder)

        config_file = os.path.join(restore_folder, DEFAULT_CONFIG)
        if os.path.isfile(config_file):
            os.remove(config_file)

        self.logger.info("Restoring backup...")
        copy_tree(
            os.path.join(restore_folder, self.config.backup_directory),
            get_server_path(self.config.backup_directory),
            preserve_times=1,
        )
        return STATUS_SUCCESS


class TestServer(BaseServer):
    name: str = "test"
    supports_multi_instance: bool = True