iterative/dvc

View on GitHub
dvc/stage/decorators.py

Summary

Maintainability
A
0 mins
Test Coverage
from functools import wraps

from funcy import decorator


@decorator
def rwlocked(call, read=None, write=None):
    import sys

    from dvc.dependency.db import AbstractDependency
    from dvc.dependency.repo import RepoDependency
    from dvc.rwlock import rwlock

    if read is None:
        read = []

    if write is None:
        write = []

    stage = call._args[0]

    assert stage.repo.lock.is_locked

    def _chain(names):
        return [
            item.fs_path
            for attr in names
            for item in getattr(stage, attr)
            # There is no need to lock RepoDependency deps, as there is no
            # corresponding OutputREPO, so we can't even write it.
            if not isinstance(item, (RepoDependency, AbstractDependency))
        ]

    cmd = " ".join(sys.argv)

    with rwlock(
        stage.repo.tmp_dir,
        stage.repo.fs,
        cmd,
        _chain(read),
        _chain(write),
        stage.repo.config["core"].get("hardlink_lock", False),
    ):
        return call()


def unlocked_repo(f):
    @wraps(f)
    def wrapper(stage, *args, **kwargs):
        stage.repo.lock.unlock()
        stage.repo._reset()
        try:
            ret = f(stage, *args, **kwargs)
        finally:
            stage.repo.lock.lock()
        return ret

    return wrapper


def relock_repo(f):
    @wraps(f)
    def wrapper(stage, *args, **kwargs):
        stage.repo.lock.lock()
        try:
            ret = f(stage, *args, **kwargs)
        finally:
            stage.repo.lock.unlock()
            stage.repo._reset()
        return ret

    return wrapper