iterative/dvc

View on GitHub
dvc/testing/remote_tests.py

Summary

Maintainability
F
6 days
Test Coverage
import os
import shutil

import pytest

from dvc.stage.cache import RunCacheNotSupported
from dvc.utils.fs import remove
from dvc_data.hashfile.tree import Tree


def _check_status(status, **kwargs):
    for key in ("ok", "missing", "new", "deleted"):
        expected = kwargs.get(key, set())
        assert expected == set(getattr(status, key))


class TestRemote:
    def test(self, tmp_dir, dvc, remote):
        (stage,) = tmp_dir.dvc_gen("foo", "foo")
        out = stage.outs[0]
        cache = out.cache_path
        foo_hash = out.hash_info
        foo_hashes = out.get_used_objs().get(None, set())

        (stage_dir,) = tmp_dir.dvc_gen(
            {
                "data_dir": {
                    "data_sub_dir": {"data_sub": "data_sub"},
                    "data": "data",
                    "empty": "",
                }
            }
        )

        out_dir = stage_dir.outs[0]
        cache_dir = out_dir.cache_path
        dir_hash = out_dir.hash_info
        dir_hashes = {dir_hash} | {oid for _, _, oid in out_dir.obj}

        # Check status
        status = dvc.cloud.status(foo_hashes)
        _check_status(status, new={foo_hash})

        status_dir = dvc.cloud.status(dir_hashes)
        _check_status(status_dir, new=dir_hashes)

        # Move cache and check status
        # See issue https://github.com/iterative/dvc/issues/4383 for details
        backup_dir = dvc.cache.local.path + ".backup"
        shutil.move(dvc.cache.local.path, backup_dir)
        status = dvc.cloud.status(foo_hashes)
        _check_status(status, missing={foo_hash})

        status_dir = dvc.cloud.status(dir_hashes)
        _check_status(status_dir, missing=dir_hashes)

        # Restore original cache:
        remove(dvc.cache.local.path)
        shutil.move(backup_dir, dvc.cache.local.path)

        # Push and check status
        dvc.cloud.push(foo_hashes)
        assert os.path.exists(cache)
        assert os.path.isfile(cache)

        dvc.cloud.push(dir_hashes)
        assert os.path.isfile(cache_dir)

        status = dvc.cloud.status(foo_hashes)
        _check_status(status, ok={foo_hash})

        status_dir = dvc.cloud.status(dir_hashes)
        _check_status(status_dir, ok=dir_hashes)

        # Remove and check status
        dvc.cache.local.clear()

        status = dvc.cloud.status(foo_hashes)
        _check_status(status, deleted={foo_hash})

        status_dir = dvc.cloud.status(dir_hashes)
        _check_status(status_dir, deleted=dir_hashes)

        # Pull and check status
        dvc.cloud.pull(foo_hashes)
        assert os.path.exists(cache)
        assert os.path.isfile(cache)
        with open(cache, encoding="utf-8") as fd:
            assert fd.read() == "foo"

        dvc.cloud.pull(dir_hashes)
        assert os.path.isfile(cache_dir)

        status = dvc.cloud.status(foo_hashes)
        _check_status(status, ok={foo_hash})

        status_dir = dvc.cloud.status(dir_hashes)
        _check_status(status_dir, ok=dir_hashes)

    @pytest.mark.xfail(raises=RunCacheNotSupported, strict=False)
    def test_stage_cache_push_pull(self, tmp_dir, dvc, remote):
        tmp_dir.gen("foo", "foo")
        stage = dvc.stage.add(
            deps=["foo"], outs=["bar"], name="copy-foo-bar", cmd="cp foo bar"
        )
        dvc.reproduce(stage.addressing)
        assert dvc.push(run_cache=True) == 2

        stage_cache_dir = tmp_dir / dvc.stage_cache.cache_dir
        expected = list(stage_cache_dir.rglob("*"))
        shutil.rmtree(stage_cache_dir)

        dvc.pull(run_cache=True)
        assert list(stage_cache_dir.rglob("*")) == expected

    @pytest.mark.xfail(raises=NotImplementedError, strict=False)
    def test_pull_00_prefix(self, tmp_dir, dvc, remote, monkeypatch):
        # Related: https://github.com/iterative/dvc/issues/6089

        fs_type = type(dvc.cloud.get_remote_odb("upstream").fs)
        monkeypatch.setattr(fs_type, "_ALWAYS_TRAVERSE", True, raising=False)
        monkeypatch.setattr(fs_type, "LIST_OBJECT_PAGE_SIZE", 256, raising=False)

        # foo's md5 checksum is 00411460f7c92d2124a67ea0f4cb5f85
        # bar's md5 checksum is 0000000018e6137ac2caab16074784a6
        foo_out = tmp_dir.dvc_gen("foo", "363")[0].outs[0]
        bar_out = tmp_dir.dvc_gen("bar", "jk8ssl")[0].outs[0]
        expected_hashes = {foo_out.hash_info, bar_out.hash_info}

        dvc.push()
        status = dvc.cloud.status(expected_hashes)
        _check_status(status, ok=expected_hashes)

        dvc.cache.local.clear()
        remove(tmp_dir / "foo")
        remove(tmp_dir / "bar")

        stats = dvc.pull()
        assert stats["fetched"] == 2
        assert set(stats["added"]) == {"foo", "bar"}

    @pytest.mark.xfail(raises=NotImplementedError, strict=False)
    def test_pull_no_00_prefix(self, tmp_dir, dvc, remote, monkeypatch):
        # Related: https://github.com/iterative/dvc/issues/6244

        fs_type = type(dvc.cloud.get_remote_odb("upstream").fs)
        monkeypatch.setattr(fs_type, "_ALWAYS_TRAVERSE", True, raising=False)
        monkeypatch.setattr(fs_type, "LIST_OBJECT_PAGE_SIZE", 256, raising=False)

        # foo's md5 checksum is 14ffd92a6cbf5f2f657067df0d5881a6
        # bar's md5 checksum is 64020400f00960c0ef04052547b134b3
        foo_out = tmp_dir.dvc_gen("foo", "dvc")[0].outs[0]
        bar_out = tmp_dir.dvc_gen("bar", "cml")[0].outs[0]
        expected_hashes = {foo_out.hash_info, bar_out.hash_info}

        dvc.push()
        status = dvc.cloud.status(expected_hashes)
        _check_status(status, ok=expected_hashes)

        dvc.cache.local.clear()
        remove(tmp_dir / "foo")
        remove(tmp_dir / "bar")

        stats = dvc.pull()
        assert stats["fetched"] == 2
        assert set(stats["added"]) == {"foo", "bar"}


class TestRemoteVersionAware:
    def test_file(self, tmp_dir, dvc, run_copy, remote_version_aware):
        (stage,) = tmp_dir.dvc_gen("foo", "foo")
        run_copy("foo", "foo_copy", name="copy")

        assert dvc.push()
        assert (remote_version_aware / "foo").read_text() == "foo"
        assert (remote_version_aware / "foo_copy").read_text() == "foo"
        foo_dvc = (tmp_dir / "foo.dvc").read_text()
        assert "version_id" in foo_dvc
        stage = stage.reload()
        out = stage.outs[0]
        assert out.meta.version_id
        dvc_lock = (tmp_dir / "dvc.lock").read_text()

        remove(dvc.cache.local.path)
        remove(tmp_dir / "foo")
        remove(tmp_dir / "foo_copy")

        assert dvc.pull()
        assert (tmp_dir / "foo").read_text() == "foo"
        assert (tmp_dir / "foo_copy").read_text() == "foo"
        assert (tmp_dir / "foo.dvc").read_text() == foo_dvc
        assert (tmp_dir / "dvc.lock").read_text() == dvc_lock

        assert not dvc.push()
        assert (remote_version_aware / "foo").read_text() == "foo"
        assert (remote_version_aware / "foo_copy").read_text() == "foo"
        assert (tmp_dir / "foo.dvc").read_text() == foo_dvc
        assert (tmp_dir / "dvc.lock").read_text() == dvc_lock

        dvc.reproduce()
        assert not dvc.push()
        assert (remote_version_aware / "foo").read_text() == "foo"
        assert (remote_version_aware / "foo_copy").read_text() == "foo"
        assert (tmp_dir / "foo.dvc").read_text() == foo_dvc
        assert (tmp_dir / "dvc.lock").read_text() == dvc_lock

    def test_dir(self, tmp_dir, dvc, run_copy, remote_version_aware):  # noqa: PLR0915
        (stage,) = tmp_dir.dvc_gen(
            {
                "data_dir": {
                    "data_sub_dir": {"data_sub": "data_sub"},
                    "data": "data",
                    "empty": "",
                }
            }
        )

        assert not dvc.fetch()
        assert dvc.push()

        data_dir_dvc = (tmp_dir / "data_dir.dvc").read_text()
        assert "files" in data_dir_dvc
        assert "version_id" in data_dir_dvc
        stage = stage.reload()
        out = stage.outs[0]
        assert out.files
        for file in out.files:
            assert file["version_id"]
            assert file["remote"] == "upstream"

        remove(dvc.cache.local.path)
        remove(tmp_dir / "data_dir")

        assert dvc.pull()
        assert (tmp_dir / "data_dir" / "data").read_text() == "data"
        assert (
            tmp_dir / "data_dir" / "data_sub_dir" / "data_sub"
        ).read_text() == "data_sub"
        assert (tmp_dir / "data_dir.dvc").read_text() == data_dir_dvc

        run_copy("data_dir", "data_dir_copy", name="copy")
        dvc_lock = (tmp_dir / "dvc.lock").read_text()

        assert dvc.push()
        assert (remote_version_aware / "data_dir").exists()
        assert (remote_version_aware / "data_dir" / "data").exists()
        assert (remote_version_aware / "data_dir_copy").exists()
        assert (remote_version_aware / "data_dir_copy" / "data").exists()
        assert (tmp_dir / "data_dir.dvc").read_text() == data_dir_dvc
        assert (tmp_dir / "dvc.lock").read_text() != dvc_lock
        dvc_lock = (tmp_dir / "dvc.lock").read_text()

        assert not dvc.push()
        assert (remote_version_aware / "data_dir").exists()
        assert (remote_version_aware / "data_dir" / "data").exists()
        assert (remote_version_aware / "data_dir_copy").exists()
        assert (remote_version_aware / "data_dir_copy" / "data").exists()
        assert (tmp_dir / "data_dir.dvc").read_text() == data_dir_dvc
        assert (tmp_dir / "dvc.lock").read_text() == dvc_lock

        dvc.cache.local.clear()
        remove(tmp_dir / "data_dir")
        remove(tmp_dir / "data_dir_copy")
        assert not dvc.push()
        assert (remote_version_aware / "data_dir").exists()
        assert (remote_version_aware / "data_dir" / "data").exists()
        assert (remote_version_aware / "data_dir_copy").exists()
        assert (remote_version_aware / "data_dir_copy" / "data").exists()
        assert (tmp_dir / "data_dir.dvc").read_text() == data_dir_dvc
        assert (tmp_dir / "dvc.lock").read_text() == dvc_lock

        (remote_version_aware / "data_dir").rmdir()
        (remote_version_aware / "data_dir_copy").rmdir()
        assert not (remote_version_aware / "data_dir").exists()
        assert not (remote_version_aware / "data_dir_copy").exists()
        assert dvc.pull()
        assert (tmp_dir / "data_dir" / "data").read_text() == "data"
        assert (
            tmp_dir / "data_dir" / "data_sub_dir" / "data_sub"
        ).read_text() == "data_sub"
        assert (tmp_dir / "data_dir_copy" / "data").read_text() == "data"
        assert (
            tmp_dir / "data_dir_copy" / "data_sub_dir" / "data_sub"
        ).read_text() == "data_sub"


class TestRemoteWorktree:
    def test_file(self, tmp_dir, dvc, remote_worktree):
        (stage,) = tmp_dir.dvc_gen("foo", "foo")

        dvc.push()
        assert "version_id" in (tmp_dir / "foo.dvc").read_text()
        stage = stage.reload()
        out = stage.outs[0]
        assert out.meta.version_id

        remove(dvc.cache.local.path)
        remove(tmp_dir / "foo")

        dvc.pull()
        assert (tmp_dir / "foo").read_text() == "foo"

    def test_dir(self, tmp_dir, dvc, remote_worktree):
        (stage,) = tmp_dir.dvc_gen(
            {
                "data_dir": {
                    "data_sub_dir": {"data_sub": "data_sub"},
                    "data": "data",
                    "empty": "",
                }
            }
        )

        dvc.push()
        assert "files" in (tmp_dir / "data_dir.dvc").read_text()
        assert "version_id" in (tmp_dir / "data_dir.dvc").read_text()
        stage = stage.reload()
        out = stage.outs[0]
        assert out.files
        for file in out.files:
            assert file["version_id"]
            assert file["remote"] == "upstream"

        remove(dvc.cache.local.path)
        remove(tmp_dir / "data_dir")

        dvc.pull()
        assert (tmp_dir / "data_dir" / "data").read_text() == "data"
        assert (
            tmp_dir / "data_dir" / "data_sub_dir" / "data_sub"
        ).read_text() == "data_sub"

    def test_deletion(self, tmp_dir, dvc, scm, remote_worktree):
        tmp_dir.dvc_gen(
            {
                "data_dir": {
                    "data_sub_dir": {"data_sub": "data_sub"},
                    "data": "data",
                    "empty": "",
                }
            }
        )
        dvc.push()
        assert (remote_worktree / "data_dir" / "data").exists()
        tmp_dir.scm_add([tmp_dir / "data_dir.dvc"], commit="v1")
        v1 = scm.get_rev()
        remove(tmp_dir / "data_dir" / "data")
        dvc.add(str(tmp_dir / "data_dir"))

        # data_dir/data should show as deleted in the remote
        dvc.push()
        tmp_dir.scm_add([tmp_dir / "data_dir.dvc"], commit="v2")
        assert not (remote_worktree / "data_dir" / "data").exists()

        remove(dvc.cache.local.path)
        remove(tmp_dir / "data_dir")
        # pulling the original pushed version should still succeed
        scm.checkout(v1)
        dvc.pull()
        assert (tmp_dir / "data_dir" / "data").read_text() == "data"

    def test_update(self, tmp_dir, dvc, remote_worktree):
        (foo_stage,) = tmp_dir.dvc_gen("foo", "foo")
        (data_dir_stage,) = tmp_dir.dvc_gen(
            {
                "data_dir": {
                    "data_sub_dir": {"data_sub": "data_sub"},
                    "data": "data",
                    "empty": "",
                }
            }
        )
        dvc.push()
        orig_foo = foo_stage.reload().outs[0]
        orig_data_dir = data_dir_stage.reload().outs[0]
        (remote_worktree / "foo").write_text("bar")
        (remote_worktree / "data_dir" / "data").write_text("modified")
        (remote_worktree / "data_dir" / "new_data").write_text("new data")

        dvc.update([str(tmp_dir / "foo.dvc"), str(tmp_dir / "data_dir.dvc")])
        updated_foo = foo_stage.reload().outs[0]
        updated_data_dir = data_dir_stage.reload().outs[0]

        assert updated_foo.meta.version_id
        assert updated_foo.meta.version_id != orig_foo.meta.version_id
        updated_data_dir = data_dir_stage.reload().outs[0]
        orig_tree = orig_data_dir.get_obj()
        updated_tree = Tree.from_list(updated_data_dir.files, hash_name="md5")
        assert orig_tree.get(("data_sub_dir", "data_sub")) == updated_tree.get(
            ("data_sub_dir", "data_sub")
        )
        orig_meta, _ = orig_tree.get(("data",))
        updated_meta, _ = updated_tree.get(("data",))
        assert orig_meta.version_id
        assert updated_meta.version_id
        assert orig_meta.version_id != updated_meta.version_id
        meta, hash_info = updated_tree.get(("new_data",))
        assert meta
        assert hash_info

        assert (tmp_dir / "foo").read_text() == "bar"
        assert (tmp_dir / "data_dir" / "data").read_text() == "modified"
        assert (tmp_dir / "data_dir" / "new_data").read_text() == "new data"

        remove(dvc.cache.local.path)
        remove(tmp_dir / "foo")
        remove(tmp_dir / "data_dir")
        dvc.pull()
        assert (tmp_dir / "foo").read_text() == "bar"
        assert (tmp_dir / "data_dir" / "data").read_text() == "modified"
        assert (tmp_dir / "data_dir" / "new_data").read_text() == "new data"