volt/server.py

Summary

Maintainability
A
45 mins
Test Coverage
C
71%
"""Development server."""

# Copyright (c) 2012-2023 Wibowo Arindrarto <contact@arindrarto.dev>
# SPDX-License-Identifier: BSD-3-Clause

import queue
import signal
import sys
import socket
import threading
import time
from contextlib import suppress
from datetime import datetime as dt
from http import HTTPStatus
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import cast, Any, Callable, NoReturn, Optional, Self
from webbrowser import open as open_browser

import structlog
from blinker import signal as blinker_signal
from click import echo
from click._compat import get_text_stderr
from structlog.contextvars import bound_contextvars
from watchdog import events
from watchdog.observers import Observer

from . import __version__, constants, signals as blinker_signals
from .config import Config
from .error import _VoltServerExit
from ._logging import style


__all__ = ["make_server"]


log = structlog.get_logger(__name__)


_pre_server_serve = blinker_signal("_pre_server_serve")


class _RunFile:
    DRAFT_ON = "draft"
    DRAFT_OFF = "no-draft"

    @classmethod
    def from_config(cls, config: Config, draft: bool) -> Self:
        log.debug("creating server run file object from config", draft=draft)
        return cls(path=config._server_run_path, draft=draft)

    @classmethod
    def from_path(cls, path: Path) -> Optional[Self]:
        with bound_contextvars(path=path):
            log.debug("creating server run file object from existing file")
            if not path.exists():
                log.debug("no server run file found")
                return None

        draft = path.read_text().strip() == cls.DRAFT_ON
        return cls(path, draft)

    def __init__(self, path: Path, draft: bool) -> None:
        self._path = path
        self._draft = draft

    @property
    def path(self) -> Path:
        return self._path

    @property
    def draft(self) -> bool:
        return self._draft

    def toggle_draft(self, value: Optional[bool]) -> Self:
        log.debug("toggling server draft mode", value=value)
        new_value = value if value is not None else (not self._draft)
        self._draft = new_value
        log.debug("toggled server draft mode", value=self.draft)
        return self

    def dump(self) -> None:
        log.debug("writing server run file", path=self.path, draft=self.draft)
        self.path.write_text(self.DRAFT_ON if self.draft else self.DRAFT_OFF)
        return None

    def remove(self) -> None:
        log.debug("removing server run file", path=self.path)
        with suppress(OSError):
            self.path.unlink()


def make_server(
    config: Config,
    host: str,
    port: int,
    with_draft: bool,
    with_sig_handlers: bool,
    log_level: str,
    log_color: bool,
) -> Callable[[bool], None]:
    class HTTPRequestHandler(SimpleHTTPRequestHandler):
        server_version = f"volt-dev-server/{__version__}"

        def __init__(self, *args: Any, **kwargs: Any) -> None:
            kwargs["directory"] = f"{config.output_dir}"
            super().__init__(*args, **kwargs)

        if log_level in {"warning", "error", "critical"}:

            def log_message(self, fmt: str, *args: Any) -> None:
                return None

        else:

            def log_message(self, fmt: str, *args: Any) -> None:
                # overrides parent log_message to provide a more compact output.
                method: str = args[0]
                status: HTTPStatus = args[1]
                path: str = args[2]

                code = f"{status.value}"
                if status.value >= 400:
                    code = style(code, fg="red", bold=True)
                elif status.value >= 300:
                    code = style(code, fg="yellow", bold=True)
                else:
                    code = style(code, fg="cyan", bold=True)

                path = style(path, fg="bright_blue")

                echo(fmt % (code, method, path), file=get_text_stderr())

        def log_request(
            self,
            code: str | int = "-",
            size: str | int = "-",
        ) -> Any:
            ts = dt.now().strftime("%H:%M:%S.%f")
            if log_color:
                fmt = '%30s | %%s · %%s "%%s"' % style(ts, fg="bright_black")
            else:
                fmt = '%21s - %%s · %%s "%%s"' % style(ts, fg="bright_black")
            method, path = self.requestline[:-9].split(" ", 1)
            self.log_message(fmt, method, cast(HTTPStatus, code), path)

        def log_error(self, *args: Any) -> None:
            # overrides parent log_error to reduce noise.
            pass

    run_file = _RunFile.from_config(config, with_draft)
    run_file.dump()

    def serve(with_open_browser: bool) -> None:
        httpd = ThreadingHTTPServer((host, port), HTTPRequestHandler)

        if with_sig_handlers:

            def signal_handler(signum: int, frame: Any) -> NoReturn:
                try:
                    httpd.server_close()
                finally:
                    if signum == signal.SIGINT:
                        print("", file=sys.stderr, flush=True)
                    log.info(f"dev server stopped ({signal.strsignal(signum)})")
                    raise _VoltServerExit(run_file_path=run_file.path)

            signal.signal(signal.SIGINT, signal_handler)
            signal.signal(signal.SIGTERM, signal_handler)

        blinker_signals.send(
            _pre_server_serve,
            with_open_browser=with_open_browser,
            host=host,
            port=port,
        )
        log.info("dev server listening", url=f"http://{host}:{port}")
        httpd.serve_forever()

    return serve


_browser_opened = False


@_pre_server_serve.connect
def _open_browser_handler(
    _: Any,
    with_open_browser: bool,
    host: str,
    port: int,
) -> None:
    if not with_open_browser:
        return None

    global _browser_opened

    if _browser_opened:
        return None

    if _wait_ready(host, port):
        log.debug("opening browser", host=host, port=port)
        open_browser(f"http://{host}:{port}")
        _browser_opened = True

    return None


def _wait_ready(
    host: str,
    port: int,
    max_attempts: int = 10,
    wait_per_attempt: float = 0.1,
) -> bool:
    with bound_contextvars(host=host, port=port):
        log.debug("checking if server is ready")
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        num_attempts = 0
        while num_attempts < max_attempts:
            num_attempts += 1
            try:
                s.connect((host, port))
                s.close()
                log.debug("server is ready")
                return True
            except OSError:
                log.debug("waiting for server to be ready", num_attempts=num_attempts)
                time.sleep(wait_per_attempt)

    return False


class _SyncQueue(queue.Queue):
    """A queue of size=1 that drops events sent to it while it processes tasks"""

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        kwargs["maxsize"] = 1
        super().__init__(*args, **kwargs)
        self._putlock = threading.Lock()

    def put(
        self,
        item: Any,
        block: bool = True,
        timeout: Optional[float] = None,
    ) -> None:
        if not self._putlock.acquire(blocking=False):
            return

        if self.unfinished_tasks > 0:
            self._putlock.release()
            return

        with suppress(queue.Full):
            super().put(item, False, timeout=None)

        self._putlock.release()


class _BuildObserver(Observer):
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)  # type: ignore[no-untyped-call]
        self._event_queue = _SyncQueue()


class _BuildHandler(events.RegexMatchingEventHandler):
    def __init__(self, config: Config, build_func: Callable) -> None:
        prefix = f"{config.project_dir_rel}".replace(".", r"\.")
        regexes = [
            *[
                f"^{prefix + '/' + dir_name + '/'}.+$"
                for dir_name in (
                    constants.PROJECT_EXTENSION_DIR_NAME,
                    constants.PROJECT_CONTENTS_DIR_NAME,
                    (
                        f"{constants.PROJECT_CONTENTS_DIR_NAME}"
                        f"/{constants.PROJECT_STATIC_DIR_NAME}"
                    ),
                    constants.SITE_THEMES_DIR_NAME,
                )
            ],
            f"^{prefix + '/' + constants.CONFIG_FILE_NAME}$",
            f"^{prefix + '/' + constants.SERVER_RUN_FILE_NAME}$",
        ]
        ignore_regexes = [
            f"^{prefix + '/' + constants.PROJECT_OUTPUT_DIR_NAME + '/'}.+$",
            ".*__pycache__.*",
        ]
        super().__init__(
            regexes,
            ignore_regexes,
            case_sensitive=True,
        )  # type: ignore[no-untyped-call]
        self.config = config
        self._build = build_func

    def on_any_event(self, event: Any) -> None:
        log_attrs: dict = {}
        match type(event):
            case events.FileCreatedEvent:
                log_attrs = dict(
                    reason="file_created",
                    file=event.src_path.removeprefix("./"),
                )

            case events.FileModifiedEvent:
                log_attrs = dict(
                    reason="file_modified",
                    file=event.src_path.removeprefix("./"),
                )

            case events.FileDeletedEvent:
                log_attrs = dict(
                    reason="file_deleted",
                    file=event.src_path.removeprefix("./"),
                )

            case events.FileMovedEvent:
                log_attrs = dict(
                    reason="file_moved",
                    src=event.src_path.removeprefix("./"),
                    dest=event.dest_path.removeprefix("./"),
                )

            case events.DirCreatedEvent:
                log_attrs = dict(
                    reason="dir_created",
                    dir=event.src_path.removeprefix("./"),
                )

            case events.DirModifiedEvent:
                log_attrs = dict(
                    reason="dir_modified",
                    dir=event.src_path.removeprefix("./"),
                )

            case events.DirDeletedEvent:
                log_attrs = dict(
                    reason="dir_deleted",
                    dir=event.src_path.removeprefix("./"),
                )

            case events.DirMovedEvent:
                log_attrs = dict(
                    reason="dir_moved",
                    src=event.src_path.removeprefix("./"),
                    dest=event.dest_path.removeprefix("./"),
                )

            case _:
                log_attrs = dict(reason="unknown")

        log.info("rebuilding site", **log_attrs)
        self._build()
        return None


class _Rebuilder:
    def __init__(self, config: Config, build_func: Callable) -> None:
        self._observer = _BuildObserver()
        self._observer.schedule(
            _BuildHandler(config, build_func),
            config.project_dir_rel,
            recursive=True,
        )  # type: ignore[no-untyped-call]

    def __enter__(self):  # type: ignore
        return self._observer.start()

    def __exit__(self, typ, value, traceback):  # type: ignore
        self._observer.stop()
        self._observer.join()