volt/site.py
"""Site-level functions and classes."""
# Copyright (c) 2012-2023 Wibowo Arindrarto <contact@arindrarto.dev>
# SPDX-License-Identifier: BSD-3-Clause
import fnmatch
import os
import shutil
import tempfile
from functools import cached_property
from itertools import filterfalse, tee
from pathlib import Path
from types import ModuleType
from typing import (
cast,
Any,
Callable,
Dict,
Generator,
Iterator,
Literal,
Optional,
Sequence,
TypeVar,
)
import structlog
from . import constants, signals
from .config import Config
from .engines import Engine, MarkdownEngine
from .error import VoltResourceError
from .outputs import CopyOutput, Output, TemplateOutput
from .theme import Theme
from ._import import import_file
from ._logging import log_method
__all__ = ["Site"]
log = structlog.get_logger(__name__)
class _PlanNode:
"""Node of the :class:`_Plan` tree."""
__slots__ = ("path", "output", "children", "__dict__")
def __init__(self, path: Path, output: Optional[Output] = None) -> None:
"""Initialize a plan node.
:param path: Path to the node.
:param output: A file to be created in the site output directory. If set to
``None``, represents a directory. Otherwise, the given value must be
an :class:`Output` instance.
"""
self.path = path
self.output = output
self.children: Optional[Dict[str, _PlanNode]] = (
None if output is not None else {}
)
@cached_property
def is_dir(self) -> bool:
"""Whether the node represents a directory or not."""
return self.output is None
def __contains__(self, value: str) -> bool:
children = self.children or {}
return self.is_dir and value in children
def __iter__(self) -> Iterator["_PlanNode"]:
if not self.is_dir:
return iter([])
children = self.children or {}
return iter(children.values())
def create(self, build_dir: Path) -> None:
"""Write the node to the filesystem.
If the node represents a directory, the directory and its parents will
be created. If it represents a file, the file will be written. The
latter assumes that all parent directories of the file already exists.
"""
if self.output is None:
(build_dir / self.path).mkdir(parents=True, exist_ok=True)
return None
self.output.write(build_dir=build_dir)
return None
def add_child(self, key: str, output: Optional[Output] = None) -> None:
"""Add a child to the node.
If a child with the given key already exists, nothing is done.
:param str key: Key to given child.
:param output: A file to be created in the site output directory.
If set to ``None``, represents a directory. Otherwise, the given
value must be a subclass of :class:`Output`.
:raises TypeError: if the node represents a directory (does not have
any children).
"""
if not self.is_dir:
raise TypeError("cannot add children to file node")
# TODO: Adjustable behavior for outputs with the same dest? For now
# just take the first one.
children = self.children or {}
if key in children:
return
children[key] = _PlanNode(self.path / key, output)
self.children = children
class _Plan:
"""The file and directory layout of the final built site.
A plan is essentially an n-ary tree whose nodes represent either directories or
files to be created.
"""
def __init__(self) -> None:
"""Initialize a plan."""
out_relpath = Path()
self.out_relpath = out_relpath
self._root = _PlanNode(out_relpath)
self._root_path_len = len(out_relpath.parts)
def add_output(self, output: Output) -> None:
"""Add an output to the plan.
:param output: A file to be created in the site output directory.
:param src_path: The input file used to create the output, if applicable.
:raises ValueError:
* when the given output's destination path is not a path relative to
the working directory.
* when the given output's destination path does not start with the
project site destination path.
* when the given output's destination path conflicts with an
existing one
"""
# Ensure output dest starts with project site_dest
prefix_len = self._root_path_len
if output.url_parts[:prefix_len] != self._root.path.parts:
raise ValueError(
"output destination does not start with project site destination"
)
rem_len = len(output.url_parts) - prefix_len
cur = self._root
for idx, p in enumerate(output.url_parts[prefix_len:], start=1):
try:
if idx < rem_len:
cur.add_child(p)
cur = cast(Dict[str, _PlanNode], cur.children)[p]
else:
if p in cur:
raise ValueError(
f"output path {output.url!r}"
+ (
f" from input {str(src)!r}"
if (src := getattr(output, "src", None)) is not None
else ""
)
+ " already added to the plan"
)
cur.add_child(p, output)
except TypeError:
raise ValueError(
f"path of output item {str(cur.path / p)!r}"
f" conflicts with {str(cur.path)!r}"
) from None
return None
def fnodes(self) -> Generator[_PlanNode, None, None]:
"""Yield all file output nodes, depth-first."""
# TODO: Maybe compress the paths so we don't have to iterate over all
# directory parts?
nodes = [self._root]
while nodes:
cur = nodes.pop()
nodes.extend(iter(cur))
if not cur.is_dir:
yield cur
def dnodes(self) -> Generator[_PlanNode, None, None]:
"""Yield the least number of directory nodes required to construct
the site.
In other words, yields nodes whose children all represent file outputs.
"""
nodes = [self._root]
while nodes:
cur = nodes.pop()
children = list(iter(cur))
fnodes = [c for c in children if not c.is_dir]
if children and len(fnodes) == len(children):
yield cur
else:
nodes.extend(children)
def write_nodes(self, build_dir: Path) -> None:
"""Write the plan nodes according to the plan under the given parent
directory."""
for dn in self.dnodes():
dn.create(build_dir=build_dir)
for fn in self.fnodes():
fn.create(build_dir=build_dir)
return None
class Site:
"""The static site."""
def __init__(self, config: Config) -> None:
"""Initialize the static site for building.
:param config: The validated site configuration.
"""
self.__build_dir: Optional[Path] = None
self.__hooks: dict[str, ModuleType] = {}
self.config = config
self.outputs = list[Output]()
self.engine: Optional[Engine] = None
self.theme = Theme.from_config(config)
signals.send(signals.post_site_load_theme, site=self)
def __repr__(self) -> str:
config = self.config
return f"{self.__class__.__name__}(name={config.name!r}, url={config.url!r})"
@property
def build_dir(self) -> Optional[Path]:
"""Build directory, set only just before the site is written to disk."""
return self.__build_dir
@log_method(with_args=True)
def build(
self,
with_draft: bool,
clean: bool,
build_dir_prefix: str = constants.BUILD_DIR_PREFIX,
) -> None:
"""Build the static site in the destination directory."""
try:
self.__build(
with_draft=with_draft,
clean=clean,
build_dir_prefix=build_dir_prefix,
)
finally:
self.__hooks = {}
signals._clear_site_signal_receivers()
def has_output(self, pattern: str) -> bool:
return (
next(
(item for item in self.outputs if fnmatch.fnmatch(item.url, pattern)),
None,
)
is not None
)
def select_outputs(self, pattern: str) -> list[Output]:
return [item for item in self.outputs if fnmatch.fnmatch(item.url, pattern)]
def extract_outputs(self, pattern: str) -> list[Output]:
matching, rest = _partition_outputs(
self.outputs,
lambda t: fnmatch.fnmatch(t.url, pattern),
)
rv = list(matching)
self.outputs = list(rest)
return rv
@log_method(with_args=True)
def __build(self, with_draft: bool, clean: bool, build_dir_prefix: str) -> None:
"""Build the static site in the destination directory."""
self.__load_hooks()
self.__load_engine()
signals.send(signals.post_site_load_engines, site=self)
self.__collect_outputs(with_draft=with_draft)
signals.send(signals.post_site_collect_outputs, site=self)
self.__update_render_kwargs(site=self, config=self.config, theme=self.theme)
with tempfile.TemporaryDirectory(prefix=build_dir_prefix) as tmp_dir_name:
build_dir = Path(tmp_dir_name)
self.__build_dir = build_dir
signals.send(signals.pre_site_write, site=self)
self.__write(build_dir=build_dir, clean=clean)
signals.send(signals.post_site_write, site=self)
log.debug("removing build dir", path=build_dir)
return None
@log_method
def __load_hooks(self) -> None:
self.__load_hook("theme")
self.__load_hook("project")
return None
@log_method
def __load_hook(self, kind: Literal["project", "theme"]) -> None:
config = self.config
fp = config.hooks_module_path
name = config.hooks_module_name
theme = self.theme
if kind == "theme":
fp = theme.hooks_module_path
name = theme.hooks_module_name
log.debug(f"checking if {kind} hooks extension is present")
if not fp.exists():
log.debug(f"found no {kind} hooks extension")
return None
log.debug(f"loading {kind} hooks extension", path=fp, name=name)
# NOTE: keeping a reference to the imported module to avoid garbage
# cleanup that would remove hooks.
self.__hooks[kind] = import_file(fp, name)
log.debug(f"loaded {kind} hooks extension")
return None
@log_method
def __load_engine(self) -> None:
self.engine = (
spec.load()
if (spec := self.theme.get_engine_spec()) is not None
else MarkdownEngine(config=self.config, theme=self.theme)
)
@log_method
def __prepare_static_outputs(self, with_draft: bool) -> list[Output]:
config = self.config
theme = self.theme
outputs = {
output.url: output
for output in _collect_copy_outputs(theme.static_dir, config.invoc_dir)
}
for user_output in _collect_copy_outputs(config.static_dir, config.invoc_dir):
url = user_output.url
if url in outputs:
log.warn(
"overwriting theme static file with user-defined static file",
url=url,
)
outputs[url] = user_output
if with_draft:
for draft_output in _collect_copy_outputs(
config.draft_static_dir,
config.invoc_dir,
):
url = draft_output.url
if url in outputs:
log.warn(
"overwriting static file with its draft version",
url=url,
)
outputs[url] = user_output
return list(outputs.values())
@log_method
def __collect_outputs(self, with_draft: bool) -> None:
if self.engine is None:
return None
self.outputs = [
*self.__prepare_static_outputs(with_draft),
*self.engine.prepare_outputs(with_draft),
]
return None
@log_method(with_args=True)
def __write(self, build_dir: Path, clean: bool) -> None:
"""Write all collected outputs under the destination directory."""
plan = _Plan()
for output in self.outputs:
try:
plan.add_output(output)
except ValueError as e:
raise VoltResourceError(f"{e}") from e
plan.write_nodes(build_dir=build_dir)
output_dir = self.config.output_dir
if clean:
shutil.rmtree(output_dir, ignore_errors=True)
shutil.copytree(src=build_dir, dst=output_dir, dirs_exist_ok=not clean)
# chmod if inside container to ensure host can use it as if not generated
# from inside the container.
if self.config.in_docker:
for dp, _, file_names in os.walk(output_dir):
os.chmod(dp, 0o777) # nosec: B103
for fn in file_names:
os.chmod(os.path.join(dp, fn), 0o666) # nosec: B103
return None
def __update_render_kwargs(self, **kwargs: Any) -> None:
for output in self.outputs:
if not isinstance(output, TemplateOutput):
continue
output.render_kwargs.update(**kwargs)
T = TypeVar("T")
def _partition_outputs(
outputs: Sequence[T],
pred: Callable[[T], bool],
) -> tuple[Iterator[T], Iterator[T]]:
iter1, iter2 = tee(outputs)
matching = filter(pred, iter1)
rest = filterfalse(pred, iter2)
return matching, rest
def _calc_relpath(output: Path, ref: Path) -> Path:
"""Calculate the output's path relative to the reference.
:param output: The path to which the relative path will point.
:param ref: Reference path.
:returns: The relative path from ``ref`` to ``to``.
:raises ValueError: when one of the given input paths is not an absolute
path.
"""
ref = ref.expanduser()
output = output.expanduser()
if not ref.is_absolute() or not output.is_absolute():
raise ValueError("could not compute relative paths of non-absolute input paths")
common = Path(os.path.commonpath([ref, output]))
common_len = len(common.parts)
ref_uniq = ref.parts[common_len:]
output_uniq = output.parts[common_len:]
rel_parts = ("..",) * (len(ref_uniq)) + output_uniq
return Path(*rel_parts)
def _collect_copy_outputs(start_dir: Path, invocation_dir: Path) -> list[CopyOutput]:
"""Gather files from the given start directory recursively as copy outputs."""
src_relpath = _calc_relpath(start_dir, invocation_dir)
src_rel_len = len(src_relpath.parts)
outputs: list[CopyOutput] = []
try:
entries = list(os.scandir(src_relpath))
except FileNotFoundError:
return outputs
else:
while entries:
de = entries.pop()
if de.is_dir():
entries.extend(os.scandir(de))
else:
dtoks = Path(de.path).parts[src_rel_len:]
outputs.append(CopyOutput(src=Path(de.path), url_parts=dtoks))
return outputs