iterative/dvc

View on GitHub
dvc/utils/strictyaml.py

Summary

Maintainability
B
5 hrs
Test Coverage
"""
This module combines schema and yaml parser into one, to provide better error
messages through a single entrypoint `load`.

Used for parsing dvc.yaml, dvc.lock and .dvc files.

Not to be confused with strictyaml, a python library with similar motivations.
"""
import re
import typing
from contextlib import suppress
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, TypeVar

from dvc.exceptions import PrettyDvcException
from dvc.ui import ui
from dvc.utils.serialize import (
    EncodingError,
    YAMLFileCorruptedError,
    parse_yaml,
    parse_yaml_for_update,
)

if TYPE_CHECKING:
    from rich.syntax import Syntax
    from ruamel.yaml import StreamMark
    from voluptuous import MultipleInvalid

    from dvc.fs import FileSystem
    from dvc.ui import RichText


_T = TypeVar("_T")
merge_conflict_marker = re.compile("^([<=>]{7}) .*$", re.MULTILINE)


def make_relpath(path: str) -> str:
    import os

    from dvc.utils import relpath

    rel = relpath(path)
    prefix = ""
    if not rel.startswith(".."):
        prefix = "./" if os.name == "posix" else ".\\"
    return prefix + rel


def _prepare_message(message: str) -> "RichText":
    return ui.rich_text(message, style="red")


def _prepare_cause(cause: str) -> "RichText":
    return ui.rich_text(cause, style="bold")


def _prepare_code_snippets(
    code: str, start_line: int = 1, **kwargs: Any
) -> "Syntax":
    from rich.syntax import Syntax

    kwargs.setdefault("start_line", start_line)
    return Syntax(
        code,
        "yaml",
        theme="ansi_dark",
        word_wrap=True,
        line_numbers=True,
        indent_guides=True,
        **kwargs,
    )


class YAMLSyntaxError(PrettyDvcException, YAMLFileCorruptedError):
    def __init__(
        self,
        path: str,
        yaml_text: str,
        exc: Exception,
        rev: Optional[str] = None,
    ) -> None:
        self.path: str = path
        self.yaml_text: str = yaml_text
        self.exc: Exception = exc

        merge_conflicts = merge_conflict_marker.search(self.yaml_text)
        self.hint = " (possible merge conflicts)" if merge_conflicts else ""
        self.rev: Optional[str] = rev
        super().__init__(self.path)

    def __pretty_exc__(self, **kwargs: Any) -> None:
        from ruamel.yaml.error import MarkedYAMLError

        exc = self.exc.__cause__

        if not isinstance(exc, MarkedYAMLError):
            raise ValueError("nothing to pretty-print here. :)")

        source = self.yaml_text.splitlines()

        def prepare_linecol(mark: "StreamMark") -> str:
            return f"in line {mark.line + 1}, column {mark.column + 1}"

        def prepare_message(
            message: str, mark: "StreamMark" = None
        ) -> "RichText":
            cause = ", ".join(
                [message.capitalize(), prepare_linecol(mark) if mark else ""]
            )
            return _prepare_cause(cause)

        def prepare_code(mark: "StreamMark") -> "Syntax":
            line = mark.line + 1
            code = "" if line > len(source) else source[line - 1]
            return _prepare_code_snippets(code, line)

        lines: List[object] = []
        if hasattr(exc, "context"):
            if exc.context_mark is not None:
                lines.append(
                    prepare_message(str(exc.context), exc.context_mark)
                )
            if exc.context_mark is not None and (
                exc.problem is None
                or exc.problem_mark is None
                or exc.context_mark.name != exc.problem_mark.name
                or exc.context_mark.line != exc.problem_mark.line
                or exc.context_mark.column != exc.problem_mark.column
            ):
                lines.extend([prepare_code(exc.context_mark), ""])
            if exc.problem is not None:
                lines.append(
                    prepare_message(str(exc.problem), exc.problem_mark)
                )
            if exc.problem_mark is not None:
                lines.append(prepare_code(exc.problem_mark))

        if lines:
            # we should not add a newline after the main message
            # if there are no other outputs
            lines.insert(0, "")

        rel = make_relpath(self.path)
        rev_msg = f" in revision '{self.rev[:7]}'" if self.rev else ""
        msg_fmt = f"'{rel}' is invalid{self.hint}{rev_msg}."
        lines.insert(0, _prepare_message(msg_fmt))
        for line in lines:
            ui.error_write(line, styled=True)


def determine_linecol(
    data, paths, max_steps=5
) -> Tuple[Optional[int], Optional[int], int]:
    """Determine linecol from the CommentedMap for the `paths` location.

    CommentedMap from `ruamel.yaml` has `.lc` property from which we can read
    `.line` and `.col`. This is available in the collections type,
    i.e. list and dictionaries.

    But this may fail on non-collection types. For example, if the `paths` is
    ['stages', 'metrics'], metrics being a boolean type does not have `lc`
    prop.
    ```
    stages:
      metrics: true
    ```

    To provide some context to the user, we step up to the
    path ['stages'], which being a collection type, will have `lc` prop
    with which we can find line and col.

    This may end up being not accurate, so we try to show the same amount of
    lines of code for `n` number of steps taken upwards. In a worst case,
    it may be just 1 step (as non-collection item cannot have child items),
    but `schema validator` may provide us arbitrary path. So, this caps the
    number of steps upward to just 5. If it does not find any linecols, it'll
    abort.
    """
    from dpath.util import get

    step = 1
    line, col = None, None
    while paths and step < max_steps:
        value = get(data, paths, default=None)
        if value is not None:
            with suppress(AttributeError, TypeError):
                line = value.lc.line + 1
                col = value.lc.col + 1
                break
        step += 1
        *paths, _ = paths

    return line, col, step


class YAMLValidationError(PrettyDvcException):
    def __init__(
        self,
        exc: "MultipleInvalid",
        path: str = None,
        text: str = None,
        rev: str = None,
    ) -> None:
        self.text = text or ""
        self.exc = exc

        rel = make_relpath(path) if path else ""
        self.path = path or ""

        message = f"'{rel}' validation failed"
        message += f" in revision '{rev[:7]}'" if rev else ""
        if len(self.exc.errors) > 1:
            message += f": {len(self.exc.errors)} errors"
        super().__init__(f"{message}")

    def _prepare_context(self, data: typing.Mapping) -> List[object]:
        lines: List[object] = []
        for index, error in enumerate(self.exc.errors):
            if index and lines[-1]:
                lines.append("")
            line, col, step = determine_linecol(data, error.path)
            parts = [error.error_message]
            if error.path:
                parts.append("in " + " -> ".join(str(p) for p in error.path))
            if line:
                parts.append(f"line {line}")
            if col:
                parts.append(f"column {col}")
            lines.append(_prepare_cause(", ".join(parts)))

            if line:
                # we show one line above the error
                # we try to show few more lines if we could not
                # reliably figure out where the error was
                lr = (line - 1, line + step - 1)
                code = _prepare_code_snippets(self.text, line_range=lr)
                lines.append(code)
        return lines

    def __pretty_exc__(self, **kwargs: Any) -> None:
        """Prettify exception message."""
        from collections.abc import Mapping

        lines: List[object] = []
        data = parse_yaml_for_update(self.text, self.path)
        if isinstance(data, Mapping):
            lines.extend(self._prepare_context(data))

        cause = ""
        if lines:
            # we should not add a newline after the main message
            # if there are no other outputs
            lines.insert(0, "")
        else:
            # if we don't have any context to show, we'll fallback to what we
            # got from voluptuous and print them in the same line.
            cause = f": {self.exc}"

        lines.insert(0, _prepare_message(f"{self}{cause}."))
        for line in lines:
            ui.error_write(line, styled=True)


def validate(
    data: _T,
    schema: Callable[[_T], _T],
    text: str = None,
    path: str = None,
    rev: str = None,
) -> _T:
    from voluptuous import MultipleInvalid

    try:
        return schema(data)
    except MultipleInvalid as exc:
        raise YAMLValidationError(exc, path, text, rev=rev) from exc


def load(
    path: str,
    schema: Callable[[_T], _T] = None,
    fs: "FileSystem" = None,
    encoding: str = "utf-8",
    round_trip: bool = False,
) -> Any:
    open_fn = fs.open if fs else open
    rev = getattr(fs, "rev", None)

    try:
        with open_fn(path, encoding=encoding) as fd:  # type: ignore
            text = fd.read()
        data = parse_yaml(text, path, typ="rt" if round_trip else "safe")
    except UnicodeDecodeError as exc:
        raise EncodingError(path, encoding) from exc
    except YAMLFileCorruptedError as exc:
        cause = exc.__cause__
        raise YAMLSyntaxError(path, text, exc, rev=rev) from cause

    if schema:
        # not returning validated data, as it may remove
        # details from CommentedMap that we get from roundtrip parser
        validate(data, schema, text=text, path=path, rev=rev)
    return data, text