iterative/dvc

View on GitHub
dvc/repo/gc.py

Summary

Maintainability
B
4 hrs
Test Coverage
from typing import TYPE_CHECKING, Optional

from dvc.exceptions import InvalidArgumentError
from dvc.log import logger

from . import locked

if TYPE_CHECKING:
    from dvc.repo import Repo
    from dvc.repo.index import ObjectContainer

logger = logger.getChild(__name__)


def _validate_args(**kwargs):
    not_in_remote = kwargs.pop("not_in_remote", None)
    cloud = kwargs.pop("cloud", None)
    remote = kwargs.pop("remote", None)
    if remote and not (cloud or not_in_remote):
        raise InvalidArgumentError("`--remote` requires `--cloud` or `--not-in-remote`")
    if not_in_remote and cloud:
        raise InvalidArgumentError(
            "`--not-in-remote` and `--cloud` are mutually exclusive"
        )
    if not any(kwargs.values()):
        raise InvalidArgumentError(
            "Either of `-w|--workspace`, `-a|--all-branches`, `-T|--all-tags` "
            "`--all-experiments`, `--all-commits`, `--date` or `--rev` "
            "needs to be set."
        )
    if kwargs.get("num") and not kwargs.get("rev"):
        raise InvalidArgumentError("`--num` can only be used alongside `--rev`")


def _used_obj_ids_not_in_remote(
    remote_odb_to_obj_ids: "ObjectContainer", jobs: Optional[int] = None
):
    used_obj_ids = set()
    remote_oids = set()
    for remote_odb, obj_ids in remote_odb_to_obj_ids.items():
        assert remote_odb
        remote_oids.update(
            remote_odb.list_oids_exists(
                {x.value for x in obj_ids if x.value},
                jobs=jobs,
            )
        )
        used_obj_ids.update(obj_ids)
    return {obj for obj in used_obj_ids if obj.value not in remote_oids}


@locked
def gc(  # noqa: C901, PLR0912, PLR0913
    self: "Repo",
    all_branches: bool = False,
    cloud: bool = False,
    remote: Optional[str] = None,
    with_deps: bool = False,
    all_tags: bool = False,
    all_commits: bool = False,
    all_experiments: bool = False,
    force: bool = False,
    jobs: Optional[int] = None,
    repos: Optional[list[str]] = None,
    workspace: bool = False,
    commit_date: Optional[str] = None,
    rev: Optional[str] = None,
    num: Optional[int] = None,
    not_in_remote: bool = False,
    dry: bool = False,
    skip_failed: bool = False,
):
    # require `workspace` to be true to come into effect.
    # assume `workspace` to be enabled if any of `all_tags`, `all_commits`,
    # `all_experiments` or `all_branches` are enabled.
    _validate_args(
        workspace=workspace,
        all_tags=all_tags,
        all_commits=all_commits,
        all_branches=all_branches,
        all_experiments=all_experiments,
        commit_date=commit_date,
        rev=rev,
        num=num,
        cloud=cloud,
        not_in_remote=not_in_remote,
    )

    from contextlib import ExitStack

    from dvc.repo import Repo
    from dvc_data.hashfile.db import get_index
    from dvc_data.hashfile.gc import gc as ogc

    if not repos:
        repos = []
    all_repos = [Repo(path) for path in repos]

    odb_to_obj_ids: "ObjectContainer" = {}
    with ExitStack() as stack:
        for repo in all_repos:
            stack.enter_context(repo.lock)

        for repo in [*all_repos, self]:
            for odb, obj_ids in repo.used_objs(
                all_branches=all_branches,
                with_deps=with_deps,
                all_tags=all_tags,
                all_commits=all_commits,
                all_experiments=all_experiments,
                commit_date=commit_date,
                remote=remote,
                force=force,
                jobs=jobs,
                revs=[rev] if rev else None,
                num=num or 1,
                skip_failed=skip_failed,
            ).items():
                if odb not in odb_to_obj_ids:
                    odb_to_obj_ids[odb] = set()
                odb_to_obj_ids[odb].update(obj_ids)

    if cloud or not_in_remote:
        _merge_remote_obj_ids(self, remote, odb_to_obj_ids)
    if not_in_remote:
        used_obj_ids = _used_obj_ids_not_in_remote(odb_to_obj_ids, jobs=jobs)
    else:
        used_obj_ids = set()
        used_obj_ids.update(*odb_to_obj_ids.values())

    for scheme, odb in self.cache.by_scheme():
        if not odb:
            continue
        num_removed = ogc(odb, used_obj_ids, jobs=jobs, dry=dry)
        if num_removed:
            logger.info("Removed %d objects from %s cache.", num_removed, scheme)
        else:
            logger.info("No unused '%s' cache to remove.", scheme)

    if not cloud:
        return

    for remote_odb, obj_ids in odb_to_obj_ids.items():
        assert remote_odb is not None
        num_removed = ogc(remote_odb, obj_ids, jobs=jobs, dry=dry)
        if num_removed:
            get_index(remote_odb).clear()
            logger.info("Removed %d objects from remote.", num_removed)
        else:
            logger.info("No unused cache to remove from remote.")


def _merge_remote_obj_ids(
    repo: "Repo", remote: Optional[str], used_objs: "ObjectContainer"
):
    # Merge default remote used objects with remote-per-output used objects
    default_obj_ids = used_objs.pop(None, set())
    remote_odb = repo.cloud.get_remote_odb(remote, "gc -c", hash_name="md5")
    if remote_odb not in used_objs:
        used_objs[remote_odb] = set()
    used_objs[remote_odb].update(default_obj_ids)
    legacy_odb = repo.cloud.get_remote_odb(remote, "gc -c", hash_name="md5-dos2unix")
    if legacy_odb not in used_objs:
        used_objs[legacy_odb] = set()
    used_objs[legacy_odb].update(default_obj_ids)