iterative/dvc

View on GitHub
dvc/stage/utils.py

Summary

Maintainability
A
45 mins
Test Coverage
import os
import pathlib
from typing import TYPE_CHECKING, Any, Optional, Union

from funcy import concat, first, lsplit, rpartial

from dvc.annotations import ANNOTATION_FIELDS
from dvc.exceptions import InvalidArgumentError
from dvc_data.hashfile.meta import Meta

from .exceptions import (
    MissingDataSource,
    StageExternalOutputsError,
    StagePathNotDirectoryError,
    StagePathNotFoundError,
    StagePathOutsideError,
)

if TYPE_CHECKING:
    from dvc.dependency import Dependency, ParamsDependency
    from dvc.repo import Repo

    from . import PipelineStage, Stage


def check_stage_path(repo, path, is_wdir=False):
    from dvc.utils.fs import path_isin

    assert repo is not None

    error_msg = "{wdir_or_path} '{path}' {{}}".format(
        wdir_or_path="stage working dir" if is_wdir else "file path", path=path
    )

    real_path = os.path.abspath(path)
    if not os.path.exists(real_path):
        raise StagePathNotFoundError(error_msg.format("does not exist"))

    if not os.path.isdir(real_path):
        raise StagePathNotDirectoryError(error_msg.format("is not directory"))

    proj_dir = os.path.abspath(repo.root_dir)
    if real_path != proj_dir and not path_isin(real_path, proj_dir):
        raise StagePathOutsideError(error_msg.format("is outside of DVC repo"))


def fill_stage_outputs(stage, **kwargs):
    from dvc.output import loads_from

    assert not stage.outs

    keys = [
        "outs_persist",
        "outs_persist_no_cache",
        "metrics",
        "metrics_persist",
        "metrics_no_cache",
        "metrics_persist_no_cache",
        "plots",
        "plots_persist",
        "plots_no_cache",
        "plots_persist_no_cache",
        "outs_no_cache",
        "outs",
    ]

    stage.outs = []

    for key in keys:
        stage.outs += loads_from(
            stage,
            kwargs.get(key, []),
            use_cache="no_cache" not in key,
            persist="persist" in key,
            metric="metrics" in key,
            plot="plots" in key,
        )


def fill_stage_dependencies(
    stage, deps=None, erepo=None, params=None, fs_config=None, db=None
):
    from dvc.dependency import loads_from, loads_params

    assert not stage.deps
    stage.deps = []
    stage.deps += loads_from(stage, deps or [], erepo=erepo, fs_config=fs_config, db=db)
    stage.deps += loads_params(stage, params or [])


def check_no_externals(stage):
    from dvc.utils import format_link

    def _is_cached_external(out):
        return not out.is_in_repo and out.use_cache

    outs = [str(out) for out in stage.outs if _is_cached_external(out)]
    if not outs:
        return

    str_outs = ", ".join(outs)
    link = format_link(
        "https://dvc.org/doc/user-guide/pipelines/external-dependencies-and-outputs"
    )
    if stage.is_data_source:
        link = format_link(
            "https://dvc.org/doc/user-guide/data-management/importing-external-data"
        )
    raise StageExternalOutputsError(
        f"Cached output(s) outside of DVC project: {str_outs}. "
        f"See {link} for more info."
    )


def check_circular_dependency(stage):
    from dvc.exceptions import CircularDependencyError

    circular_dependencies = {d.fs_path for d in stage.deps} & {
        o.fs_path for o in stage.outs
    }

    if circular_dependencies:
        raise CircularDependencyError(str(circular_dependencies.pop()))


def check_duplicated_arguments(stage):
    from collections import Counter

    from dvc.exceptions import ArgumentDuplicationError

    path_counts = Counter(edge.fs_path for edge in stage.deps + stage.outs)

    for path, occurrence in path_counts.items():
        if occurrence > 1:
            raise ArgumentDuplicationError(str(path))


def check_missing_outputs(stage):
    paths = [str(out) for out in stage.outs if not out.exists]
    if paths:
        raise MissingDataSource(paths)


def compute_md5(stage):
    from dvc.output import Output
    from dvc.utils import dict_md5

    d = stage.dumpd()

    # Remove md5 and meta, these should not affect stage md5
    d.pop(stage.PARAM_MD5, None)
    d.pop(stage.PARAM_META, None)
    d.pop(stage.PARAM_DESC, None)

    # Ignore the wdir default value. In this case DVC file w/o
    # wdir has the same md5 as a file with the default value specified.
    # It's important for backward compatibility with pipelines that
    # didn't have WDIR in their DVC files.
    if d.get(stage.PARAM_WDIR) == ".":
        del d[stage.PARAM_WDIR]

    return dict_md5(
        d,
        exclude=[
            *ANNOTATION_FIELDS,
            stage.PARAM_LOCKED,  # backward compatibility
            stage.PARAM_FROZEN,
            Output.PARAM_METRIC,
            Output.PARAM_PERSIST,
            Meta.PARAM_ISEXEC,
            Meta.PARAM_SIZE,
            Meta.PARAM_NFILES,
        ],
    )


def resolve_wdir(wdir, path):
    from dvc.utils import relpath

    rel_wdir = relpath(wdir, os.path.dirname(path))
    return pathlib.PurePath(rel_wdir).as_posix() if rel_wdir != "." else None


def resolve_paths(fs, path, wdir=None):
    path = fs.abspath(path)
    wdir = wdir or os.curdir
    wdir = fs.abspath(fs.join(fs.dirname(path), wdir))
    return path, wdir


def get_dump(stage: "Stage", **kwargs):
    return {
        key: value
        for key, value in {
            stage.PARAM_DESC: stage.desc,
            stage.PARAM_MD5: stage.md5,
            stage.PARAM_CMD: stage.cmd,
            stage.PARAM_WDIR: resolve_wdir(stage.wdir, stage.path),
            stage.PARAM_FROZEN: stage.frozen,
            stage.PARAM_DEPS: [d.dumpd(**kwargs) for d in stage.deps],
            stage.PARAM_OUTS: [o.dumpd(**kwargs) for o in stage.outs],
            stage.PARAM_ALWAYS_CHANGED: stage.always_changed,
            stage.PARAM_META: stage.meta,
        }.items()
        if value
    }


def split_params_deps(
    stage: "Stage",
) -> tuple[list["ParamsDependency"], list["Dependency"]]:
    from dvc.dependency import ParamsDependency

    return lsplit(rpartial(isinstance, ParamsDependency), stage.deps)


def is_valid_name(name: str) -> bool:
    from . import INVALID_STAGENAME_CHARS

    return not INVALID_STAGENAME_CHARS & set(name)


def prepare_file_path(kwargs) -> str:
    """Determine file path from the first output name.

    Used in creating .dvc files.
    """
    from dvc.dvcfile import DVC_FILE_SUFFIX

    out = first(
        concat(
            kwargs.get("outs", []),
            kwargs.get("outs_no_cache", []),
            kwargs.get("metrics", []),
            kwargs.get("metrics_no_cache", []),
            kwargs.get("plots", []),
            kwargs.get("plots_no_cache", []),
            kwargs.get("outs_persist", []),
            kwargs.get("outs_persist_no_cache", []),
        )
    )
    assert out
    return os.path.basename(os.path.normpath(out)) + DVC_FILE_SUFFIX


def check_stage_exists(repo: "Repo", stage: Union["Stage", "PipelineStage"], path: str):
    from dvc.dvcfile import load_file
    from dvc.stage import PipelineStage
    from dvc.stage.exceptions import DuplicateStageName, StageFileAlreadyExistsError

    dvcfile = load_file(repo, path)
    if not dvcfile.exists():
        return

    hint = "Use '--force' to overwrite."
    if not isinstance(stage, PipelineStage):
        raise StageFileAlreadyExistsError(f"'{stage.relpath}' already exists. {hint}")
    if stage.name and stage.name in dvcfile.stages:
        raise DuplicateStageName(
            f"Stage '{stage.name}' already exists in '{stage.relpath}'. {hint}"
        )


def validate_kwargs(
    single_stage: bool = False, fname: Optional[str] = None, **kwargs
) -> dict[str, Any]:
    """Prepare, validate and process kwargs passed from cli"""
    cmd = kwargs.get("cmd")
    if not cmd and not single_stage:
        raise InvalidArgumentError("command is not specified")

    stage_name = kwargs.get("name")
    if stage_name and single_stage:
        raise InvalidArgumentError("`-n|--name` is incompatible with `--single-stage`")
    if stage_name and fname:
        raise InvalidArgumentError(
            "`--file` is currently incompatible with `-n|--name` "
            "and requires `--single-stage`"
        )
    if not stage_name and not single_stage:
        raise InvalidArgumentError("`-n|--name` is required")

    if single_stage:
        kwargs.pop("name", None)

    return kwargs


def _get_stage_files(stage: "Stage") -> list[str]:
    from dvc.dvcfile import ProjectFile
    from dvc.utils import relpath

    ret: list[str] = []
    file = stage.dvcfile
    ret.append(file.relpath)
    if isinstance(file, ProjectFile):
        ret.append(file._lockfile.relpath)

    for dep in stage.deps:
        if (
            not dep.use_scm_ignore
            and dep.is_in_repo
            and not stage.repo.dvcfs.isdvc(stage.repo.dvcfs.from_os_path(str(dep)))
        ):
            ret.append(relpath(dep.fs_path))

    for out in stage.outs:
        if not out.use_scm_ignore and out.is_in_repo:
            ret.append(relpath(out.fs_path))
    return ret