View on GitHub


5 hrs
Test Coverage
This module combines schema and yaml parser into one, to provide better error
messages through a single entrypoint `load`.

Used for parsing dvc.yaml, dvc.lock and .dvc files.

Not to be confused with strictyaml, a python library with similar motivations.
import re
import typing
from contextlib import suppress
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, TypeVar

from dvc.exceptions import PrettyDvcException
from dvc.ui import ui
from dvc.utils.serialize import (

    from rich.syntax import Syntax
    from ruamel.yaml import StreamMark
    from voluptuous import MultipleInvalid

    from dvc.fs import FileSystem
    from dvc.ui import RichText

_T = TypeVar("_T")
merge_conflict_marker = re.compile("^([<=>]{7}) .*$", re.MULTILINE)

def make_relpath(path: str) -> str:
    import os

    from dvc.utils import relpath

    rel = relpath(path)
    prefix = ""
    if not rel.startswith(".."):
        prefix = "./" if == "posix" else ".\\"
    return prefix + rel

def _prepare_message(message: str) -> "RichText":
    return ui.rich_text(message, style="red")

def _prepare_cause(cause: str) -> "RichText":
    return ui.rich_text(cause, style="bold")

def _prepare_code_snippets(code: str, start_line: int = 1, **kwargs: Any) -> "Syntax":
    from rich.syntax import Syntax

    kwargs.setdefault("start_line", start_line)
    return Syntax(

class YAMLSyntaxError(PrettyDvcException, YAMLFileCorruptedError):
    def __init__(
        path: str,
        yaml_text: str,
        exc: Exception,
        rev: Optional[str] = None,
    ) -> None:
        self.path: str = path
        self.yaml_text: str = yaml_text
        self.exc: Exception = exc

        merge_conflicts =
        self.hint = " (possible merge conflicts)" if merge_conflicts else ""
        self.rev: Optional[str] = rev

    def __pretty_exc__(self, **kwargs: Any) -> None:  # noqa: C901
        from ruamel.yaml.error import MarkedYAMLError

        exc = self.exc.__cause__

        if not isinstance(exc, MarkedYAMLError):
            raise ValueError("nothing to pretty-print here.")  # noqa: TRY004

        source = self.yaml_text.splitlines()

        def prepare_linecol(mark: "StreamMark") -> str:
            return f"in line {mark.line + 1}, column {mark.column + 1}"

        def prepare_message(
            message: str, mark: Optional["StreamMark"] = None
        ) -> "RichText":
            cause = ", ".join(
                [message.capitalize(), prepare_linecol(mark) if mark else ""]
            return _prepare_cause(cause)

        def prepare_code(mark: "StreamMark") -> "Syntax":
            line = mark.line + 1
            code = "" if line > len(source) else source[line - 1]
            return _prepare_code_snippets(code, line)

        lines: List[object] = []
        if hasattr(exc, "context"):
            if exc.context_mark is not None:
                lines.append(prepare_message(str(exc.context), exc.context_mark))
            if exc.context_mark is not None and (
                exc.problem is None
                or exc.problem_mark is None
                or !=
                or exc.context_mark.line != exc.problem_mark.line
                or exc.context_mark.column != exc.problem_mark.column
                lines.extend([prepare_code(exc.context_mark), ""])
            if exc.problem is not None:
                lines.append(prepare_message(str(exc.problem), exc.problem_mark))
            if exc.problem_mark is not None:

        if lines:
            # we should not add a newline after the main message
            # if there are no other outputs
            lines.insert(0, "")

        rel = make_relpath(self.path)
        rev_msg = f" in revision '{self.rev[:7]}'" if self.rev else ""
        msg_fmt = f"'{rel}' is invalid{self.hint}{rev_msg}."
        lines.insert(0, _prepare_message(msg_fmt))
        for line in lines:
            ui.error_write(line, styled=True)

def determine_linecol(
    data, paths, max_steps=5
) -> Tuple[Optional[int], Optional[int], int]:
    """Determine linecol from the CommentedMap for the `paths` location.

    CommentedMap from `ruamel.yaml` has `.lc` property from which we can read
    `.line` and `.col`. This is available in the collections type,
    i.e. list and dictionaries.

    But this may fail on non-collection types. For example, if the `paths` is
    ['stages', 'metrics'], metrics being a boolean type does not have `lc`
      metrics: true

    To provide some context to the user, we step up to the
    path ['stages'], which being a collection type, will have `lc` prop
    with which we can find line and col.

    This may end up being not accurate, so we try to show the same amount of
    lines of code for `n` number of steps taken upwards. In a worst case,
    it may be just 1 step (as non-collection item cannot have child items),
    but `schema validator` may provide us arbitrary path. So, this caps the
    number of steps upward to just 5. If it does not find any linecols, it'll
    from dpath import get

    step = 1
    line, col = None, None
    while paths and step < max_steps:
        value = get(data, paths, default=None)
        if value is not None:
            with suppress(AttributeError, TypeError):
                line = + 1
                col = + 1
        step += 1
        *paths, _ = paths

    return line, col, step

class YAMLValidationError(PrettyDvcException):
    def __init__(
        exc: "MultipleInvalid",
        path: Optional[str] = None,
        text: Optional[str] = None,
        rev: Optional[str] = None,
    ) -> None:
        self.text = text or ""
        self.exc = exc

        rel = make_relpath(path) if path else ""
        self.path = path or ""

        message = f"'{rel}' validation failed"
        message += f" in revision '{rev[:7]}'" if rev else ""
        if len(self.exc.errors) > 1:
            message += f": {len(self.exc.errors)} errors"

    def _prepare_context(self, data: typing.Mapping) -> List[object]:
        lines: List[object] = []
        for index, error in enumerate(self.exc.errors):
            if index and lines[-1]:
            line, col, step = determine_linecol(data, error.path)
            parts = [error.error_message]
            if error.path:
                parts.append("in " + " -> ".join(str(p) for p in error.path))
            if line:
                parts.append(f"line {line}")
            if col:
                parts.append(f"column {col}")
            lines.append(_prepare_cause(", ".join(parts)))

            if line:
                # we show one line above the error
                # we try to show few more lines if we could not
                # reliably figure out where the error was
                lr = (line - 1, line + step - 1)
                code = _prepare_code_snippets(self.text, line_range=lr)
        return lines

    def __pretty_exc__(self, **kwargs: Any) -> None:
        """Prettify exception message."""
        from import Mapping

        lines: List[object] = []
        data = parse_yaml_for_update(self.text, self.path)
        if isinstance(data, Mapping):

        cause = ""
        if lines:
            # we should not add a newline after the main message
            # if there are no other outputs
            lines.insert(0, "")
            # if we don't have any context to show, we'll fallback to what we
            # got from voluptuous and print them in the same line.
            cause = f": {self.exc}"

        lines.insert(0, _prepare_message(f"{self}{cause}."))
        for line in lines:
            ui.error_write(line, styled=True)

def validate(
    data: _T,
    schema: Callable[[_T], _T],
    text: Optional[str] = None,
    path: Optional[str] = None,
    rev: Optional[str] = None,
) -> _T:
    from voluptuous import MultipleInvalid

        return schema(data)
    except MultipleInvalid as exc:
        raise YAMLValidationError(exc, path, text, rev=rev) from exc

def load(
    path: str,
    schema: Optional[Callable[[_T], _T]] = None,
    fs: Optional["FileSystem"] = None,
    encoding: str = "utf-8",
    round_trip: bool = False,
) -> Any:
    open_fn = if fs else open
    rev = getattr(fs, "rev", None)

        with open_fn(path, encoding=encoding) as fd:  # type: ignore[operator]
            text =
        data = parse_yaml(text, path, typ="rt" if round_trip else "safe")
    except UnicodeDecodeError as exc:
        raise EncodingError(path, encoding) from exc
    except YAMLFileCorruptedError as exc:
        cause = exc.__cause__
        raise YAMLSyntaxError(path, text, exc, rev=rev) from cause

    if schema:
        # not returning validated data, as it may remove
        # details from CommentedMap that we get from roundtrip parser
        validate(data, schema, text=text, path=path, rev=rev)
    return data, text