iterative/dvc

View on GitHub
dvc/rwlock.py

Summary

Maintainability
B
5 hrs
Test Coverage
import json
import os
from collections import defaultdict
from contextlib import contextmanager

import psutil
from voluptuous import Invalid, Optional, Required, Schema

from dvc.log import logger

from .exceptions import DvcException
from .fs import localfs
from .lock import make_lock
from .utils import relpath

logger = logger.getChild(__name__)


INFO_SCHEMA = {Required("pid"): int, Required("cmd"): str}

SCHEMA = Schema(
    {
        Optional("write", default={}): {str: INFO_SCHEMA},
        Optional("read", default={}): {str: [INFO_SCHEMA]},
    }
)

RWLOCK_FILE = "rwlock"
RWLOCK_LOCK = "rwlock.lock"


class RWLockFileCorruptedError(DvcException):
    def __init__(self, path):
        super().__init__(
            f"Unable to read RWLock-file {relpath(path)!r}. JSON structure is corrupted"
        )


class RWLockFileFormatError(DvcException):
    def __init__(self, path):
        super().__init__(f"RWLock-file {relpath(path)!r} format error.")


@contextmanager
def _edit_rwlock(lock_dir, fs, hardlink):
    path = fs.join(lock_dir, RWLOCK_FILE)

    rwlock_guard = make_lock(
        fs.join(lock_dir, RWLOCK_LOCK),
        tmp_dir=lock_dir,
        hardlink_lock=hardlink,
    )
    with rwlock_guard:
        try:
            with fs.open(path, encoding="utf-8") as fobj:
                lock = SCHEMA(json.load(fobj))
        except FileNotFoundError:
            lock = SCHEMA({})
        except json.JSONDecodeError as exc:
            raise RWLockFileCorruptedError(path) from exc
        except Invalid as exc:
            raise RWLockFileFormatError(path) from exc
        lock["read"] = defaultdict(list, lock["read"])
        lock["write"] = defaultdict(dict, lock["write"])
        yield lock
        with fs.open(path, "w", encoding="utf-8") as fobj:
            json.dump(lock, fobj)


def _infos_to_str(infos):
    return "\n".join(
        "  (PID {}): {}".format(info["pid"], info["cmd"]) for info in infos
    )


def _check_blockers(tmp_dir, lock, info, *, mode, waiters):  # noqa: C901, PLR0912
    from .lock import LockError

    non_existing_pid = set()

    blockers = []
    to_release = defaultdict(list)
    for path, infos in lock[mode].items():
        for waiter_path in waiters:
            if localfs.overlaps(waiter_path, path):
                break
        else:
            continue

        infos = infos if isinstance(infos, list) else [infos]
        for blocker in infos:
            if blocker == info:
                continue

            pid = int(blocker["pid"])

            if pid in non_existing_pid:
                pass
            elif psutil.pid_exists(pid):
                blockers.append(blocker)
                continue
            else:
                non_existing_pid.add(pid)
                cmd = blocker["cmd"]
                logger.warning(
                    (
                        "Process '%s' with (Pid %s), in RWLock-file '%s'"
                        " had been killed. Auto removed it from the lock file."
                    ),
                    cmd,
                    pid,
                    relpath(path),
                )
            to_release[json.dumps(blocker, sort_keys=True)].append(path)

    if to_release:
        for info_json, path_list in to_release.items():
            info = json.loads(info_json)
            if mode == "read":
                _release_read(lock, info, path_list)
            elif mode == "write":
                _release_write(lock, info, path_list)

    if blockers:
        raise LockError(
            f"'{waiter_path}' is busy, it is being blocked by:\n"
            f"{_infos_to_str(blockers)}\n"
            "\n"
            "If there are no processes with such PIDs, you can manually "
            f"remove '{tmp_dir}/rwlock' and try again."
        )


def _acquire_read(lock, info, paths):
    changes = []

    lock["read"] = lock.get("read", defaultdict(list))

    for path in paths:
        readers = lock["read"][path]
        if info in readers:
            continue

        changes.append(path)
        readers.append(info)

    return changes


def _acquire_write(lock, info, paths):
    changes = []

    lock["write"] = lock.get("write", defaultdict(dict))

    for path in paths:
        if lock["write"][path] == info:
            continue

        changes.append(path)
        lock["write"][path] = info

    return changes


def _release_write(lock, info, changes):
    for url in changes:
        assert "write" in lock
        assert url in lock["write"]
        assert lock["write"][url] == info
        del lock["write"][url]
        if not lock["write"]:
            del lock["write"]


def _release_read(lock, info, changes):
    for url in changes:
        assert "read" in lock
        assert url in lock["read"]
        assert info in lock["read"][url]
        lock["read"][url].remove(info)
        if not lock["read"][url]:
            del lock["read"][url]
        if not lock["read"]:
            del lock["read"]


@contextmanager
def rwlock(tmp_dir, fs, cmd, read, write, hardlink):
    """Create non-thread-safe RWLock for file paths.

    Args:
        tmp_dir (str): existing directory where to create the rwlock file.
        fs (FileSystem): fs instance that tmp_dir belongs to.
        cmd (str): command that will be working on these file path.
        read ([str]): file paths that are going to be read.
        write ([str]): file paths that are going to be written.
        hardlink (bool): use hardlink lock to guard rwlock file when on edit.

    Raises:
        LockError: raised if file paths we want to read is being written to by
            another command or if file paths we want to write is being written
            to or read from by another command.
        RWLockFileCorruptedError: raised if rwlock file is not a valid JSON.
        RWLockFileFormatError: raised if rwlock file is a valid JSON, but
            has internal format that doesn't pass our schema validation.
    """
    info = {"pid": os.getpid(), "cmd": cmd}

    with _edit_rwlock(tmp_dir, fs, hardlink) as lock:
        _check_blockers(tmp_dir, lock, info, mode="write", waiters=read + write)
        _check_blockers(tmp_dir, lock, info, mode="read", waiters=write)

        rchanges = _acquire_read(lock, info, read)
        wchanges = _acquire_write(lock, info, write)

    try:
        yield
    finally:
        with _edit_rwlock(tmp_dir, fs, hardlink) as lock:
            _release_write(lock, info, wchanges)
            _release_read(lock, info, rchanges)