iterative/dvc

View on GitHub
dvc/commands/data_sync.py

Summary

Maintainability
C
7 hrs
Test Coverage
import argparse

from dvc.cli import completion, formatter
from dvc.cli.command import CmdBase
from dvc.cli.utils import append_doc_link
from dvc.log import logger

logger = logger.getChild(__name__)


class CmdDataBase(CmdBase):
    def log_summary(self, stats):
        from dvc.ui import ui
        from dvc.utils.humanize import get_summary

        default_msg = "Everything is up to date."

        if not self.args.remote and not self.repo.config["core"].get("remote"):
            ui.warn("No remote provided and no default remote set.")

        ui.write(get_summary(stats.items()) or default_msg)


class CmdDataPull(CmdDataBase):
    def log_summary(self, stats):
        from dvc.commands.checkout import log_changes

        log_changes(stats)
        super().log_summary(stats)

    def run(self):
        from dvc.exceptions import CheckoutError, DvcException

        try:
            stats = self.repo.pull(
                targets=self.args.targets,
                jobs=self.args.jobs,
                remote=self.args.remote,
                all_branches=self.args.all_branches,
                all_tags=self.args.all_tags,
                all_commits=self.args.all_commits,
                with_deps=self.args.with_deps,
                force=self.args.force,
                recursive=self.args.recursive,
                run_cache=self.args.run_cache,
                glob=self.args.glob,
                allow_missing=self.args.allow_missing,
            )
            self.log_summary(stats)
        except (CheckoutError, DvcException) as exc:
            if stats := getattr(exc, "stats", {}):
                self.log_summary(stats)
            logger.exception("failed to pull data from the cloud")
            return 1

        return 0


class CmdDataPush(CmdDataBase):
    def run(self):
        from dvc.exceptions import DvcException

        try:
            processed_files_count = self.repo.push(
                targets=self.args.targets,
                jobs=self.args.jobs,
                remote=self.args.remote,
                all_branches=self.args.all_branches,
                all_tags=self.args.all_tags,
                all_commits=self.args.all_commits,
                with_deps=self.args.with_deps,
                recursive=self.args.recursive,
                run_cache=self.args.run_cache,
                glob=self.args.glob,
            )
            self.log_summary({"pushed": processed_files_count})
        except DvcException:
            logger.exception("failed to push data to the cloud")
            return 1
        return 0


class CmdDataFetch(CmdDataBase):
    def run(self):
        from dvc.exceptions import DvcException

        try:
            processed_files_count = self.repo.fetch(
                targets=self.args.targets,
                jobs=self.args.jobs,
                remote=self.args.remote,
                all_branches=self.args.all_branches,
                all_tags=self.args.all_tags,
                all_commits=self.args.all_commits,
                with_deps=self.args.with_deps,
                recursive=self.args.recursive,
                run_cache=self.args.run_cache,
                max_size=self.args.max_size,
                types=self.args.types,
            )
            self.log_summary({"fetched": processed_files_count})
        except DvcException:
            logger.exception("failed to fetch data from the cloud")
            return 1
        return 0


def shared_parent_parser():
    from dvc.cli.parser import get_parent_parser

    # Parent parser used in pull/push/status
    parent_parser = argparse.ArgumentParser(
        add_help=False, parents=[get_parent_parser()]
    )
    parent_parser.add_argument(
        "-j",
        "--jobs",
        type=int,
        help=(
            "Number of jobs to run simultaneously. "
            "The default value is 4 * cpu_count(). "
        ),
        metavar="<number>",
    )
    parent_parser.add_argument(
        "targets",
        nargs="*",
        help=(
            "Limit command scope to these tracked files/directories, "
            ".dvc files and stage names."
        ),
    ).complete = completion.DVC_FILE  # type: ignore[attr-defined]

    return parent_parser


def add_parser(subparsers, _parent_parser):
    from dvc.commands.status import CmdDataStatus

    # Pull
    PULL_HELP = "Download tracked files or directories from remote storage."

    pull_parser = subparsers.add_parser(
        "pull",
        parents=[shared_parent_parser()],
        description=append_doc_link(PULL_HELP, "pull"),
        help=PULL_HELP,
        formatter_class=formatter.RawDescriptionHelpFormatter,
    )
    pull_parser.add_argument(
        "-r", "--remote", help="Remote storage to pull from", metavar="<name>"
    )
    pull_parser.add_argument(
        "-a",
        "--all-branches",
        action="store_true",
        default=False,
        help="Fetch cache for all branches.",
    )
    pull_parser.add_argument(
        "-T",
        "--all-tags",
        action="store_true",
        default=False,
        help="Fetch cache for all tags.",
    )
    pull_parser.add_argument(
        "-A",
        "--all-commits",
        action="store_true",
        default=False,
        help="Fetch cache for all commits.",
    )
    pull_parser.add_argument(
        "-f",
        "--force",
        action="store_true",
        default=False,
        help="Do not prompt when removing working directory files.",
    )
    pull_parser.add_argument(
        "-d",
        "--with-deps",
        action="store_true",
        default=False,
        help="Fetch cache for all dependencies of the specified target.",
    )
    pull_parser.add_argument(
        "-R",
        "--recursive",
        action="store_true",
        default=False,
        help="Pull cache for subdirectories of the specified directory.",
    )
    pull_parser.add_argument(
        "--run-cache",
        action="store_true",
        default=False,
        help="Fetch run history for all stages.",
    )
    pull_parser.add_argument(
        "--glob",
        action="store_true",
        default=False,
        help=argparse.SUPPRESS,
    )
    pull_parser.add_argument(
        "--allow-missing",
        action="store_true",
        default=False,
        help="Ignore errors if some of the files or directories are missing.",
    )
    pull_parser.set_defaults(func=CmdDataPull)

    # Push
    PUSH_HELP = "Upload tracked files or directories to remote storage."

    push_parser = subparsers.add_parser(
        "push",
        parents=[shared_parent_parser()],
        description=append_doc_link(PUSH_HELP, "push"),
        help=PUSH_HELP,
        formatter_class=formatter.RawDescriptionHelpFormatter,
    )
    push_parser.add_argument(
        "-r", "--remote", help="Remote storage to push to", metavar="<name>"
    )
    push_parser.add_argument(
        "-a",
        "--all-branches",
        action="store_true",
        default=False,
        help="Push cache for all branches.",
    )
    push_parser.add_argument(
        "-T",
        "--all-tags",
        action="store_true",
        default=False,
        help="Push cache for all tags.",
    )
    push_parser.add_argument(
        "-A",
        "--all-commits",
        action="store_true",
        default=False,
        help="Push cache for all commits.",
    )
    push_parser.add_argument(
        "-d",
        "--with-deps",
        action="store_true",
        default=False,
        help="Push cache for all dependencies of the specified target.",
    )
    push_parser.add_argument(
        "-R",
        "--recursive",
        action="store_true",
        default=False,
        help="Push cache for subdirectories of specified directory.",
    )
    push_parser.add_argument(
        "--run-cache",
        action="store_true",
        default=False,
        help="Push run history for all stages.",
    )
    push_parser.add_argument(
        "--glob",
        action="store_true",
        default=False,
        help="Allows targets containing shell-style wildcards.",
    )
    push_parser.set_defaults(func=CmdDataPush)

    # Fetch
    FETCH_HELP = "Download files or directories from remote storage to the cache."

    fetch_parser = subparsers.add_parser(
        "fetch",
        parents=[shared_parent_parser()],
        description=append_doc_link(FETCH_HELP, "fetch"),
        help=FETCH_HELP,
        formatter_class=formatter.RawDescriptionHelpFormatter,
    )
    fetch_parser.add_argument(
        "-r", "--remote", help="Remote storage to fetch from", metavar="<name>"
    )
    fetch_parser.add_argument(
        "-a",
        "--all-branches",
        action="store_true",
        default=False,
        help="Fetch cache for all branches.",
    )
    fetch_parser.add_argument(
        "-T",
        "--all-tags",
        action="store_true",
        default=False,
        help="Fetch cache for all tags.",
    )
    fetch_parser.add_argument(
        "-A",
        "--all-commits",
        action="store_true",
        default=False,
        help="Fetch cache for all commits.",
    )
    fetch_parser.add_argument(
        "-d",
        "--with-deps",
        action="store_true",
        default=False,
        help="Fetch cache for all dependencies of the specified target.",
    )
    fetch_parser.add_argument(
        "-R",
        "--recursive",
        action="store_true",
        default=False,
        help="Fetch cache for subdirectories of specified directory.",
    )
    fetch_parser.add_argument(
        "--run-cache",
        action="store_true",
        default=False,
        help="Fetch run history for all stages.",
    )
    fetch_parser.add_argument(
        "--max-size",
        type=int,
        help="Fetch data files/directories that are each below specified size (bytes).",
    )
    fetch_parser.add_argument(
        "--type",
        dest="types",
        action="append",
        default=[],
        help=(
            "Only fetch data files/directories that are of a particular "
            "type (metrics, plots)."
        ),
        choices=["metrics", "plots"],
    )
    fetch_parser.set_defaults(func=CmdDataFetch)

    # Status
    STATUS_HELP = "Show changed stages, compare local cache and a remote storage."

    status_parser = subparsers.add_parser(
        "status",
        parents=[shared_parent_parser()],
        description=append_doc_link(STATUS_HELP, "status"),
        help=STATUS_HELP,
        conflict_handler="resolve",
        formatter_class=formatter.RawDescriptionHelpFormatter,
    )
    status_parser.add_argument(
        "-q",
        "--quiet",
        action="store_true",
        default=False,
        help=(
            "Suppresses all output."
            " Exit with 0 if pipelines are up to date, otherwise 1."
        ),
    )
    status_parser.add_argument(
        "-c",
        "--cloud",
        action="store_true",
        default=False,
        help="Show status of a local cache compared to a remote repository.",
    )
    status_parser.add_argument(
        "-r",
        "--remote",
        help="Remote storage to compare local cache to",
        metavar="<name>",
    )
    status_parser.add_argument(
        "-a",
        "--all-branches",
        action="store_true",
        default=False,
        help=(
            "Show status of a local cache compared to a remote repository "
            "for all branches."
        ),
    )
    status_parser.add_argument(
        "-T",
        "--all-tags",
        action="store_true",
        default=False,
        help=(
            "Show status of a local cache compared to a remote repository for all tags."
        ),
    )
    status_parser.add_argument(
        "-A",
        "--all-commits",
        action="store_true",
        default=False,
        help=(
            "Show status of a local cache compared to a remote repository "
            "for all commits."
        ),
    )
    status_parser.add_argument(
        "-d",
        "--with-deps",
        action="store_true",
        default=False,
        help="Show status for all dependencies of the specified target.",
    )
    status_parser.add_argument(
        "-R",
        "--recursive",
        action="store_true",
        default=False,
        help="Show status of all stages in the specified directory.",
    )
    status_parser.add_argument(
        "--json",
        action="store_true",
        default=False,
        help="Show status in JSON format.",
    )
    status_parser.add_argument(
        "--no-updates",
        dest="check_updates",
        action="store_false",
        help="Ignore updates to imported data.",
    )

    status_parser.set_defaults(func=CmdDataStatus)