dvc/testing/workspace_tests.py
import os
from typing import Union
import pytest
from funcy import first
from dvc.exceptions import URLMissingError
from dvc.repo import Repo
from dvc.repo.ls_url import ls_url, parse_external_url
from dvc.utils.fs import remove
class TestImport:
def test_import(self, tmp_dir, dvc, workspace):
workspace.gen("file", "file")
assert not (tmp_dir / "file").exists() # sanity check
dvc.imp_url("remote://workspace/file")
assert (tmp_dir / "file").read_text() == "file"
assert dvc.status() == {}
@pytest.fixture
def stage_md5(self):
pytest.skip()
@pytest.fixture
def dir_md5(self):
pytest.skip()
def test_import_dir(self, tmp_dir, dvc, workspace, stage_md5, dir_md5):
from dvc.cachemgr import CacheManager
workspace.gen({"dir": {"file": "file", "subdir": {"subfile": "subfile"}}})
# remove external cache to make sure that we don't need it
# to import dirs
with dvc.config.edit() as conf:
del conf["cache"]
dvc.cache = CacheManager(dvc)
assert not (tmp_dir / "dir").exists() # sanity check
dvc.imp_url("remote://workspace/dir")
assert set(os.listdir(tmp_dir / "dir")) == {"file", "subdir"}
assert (tmp_dir / "dir" / "file").read_text() == "file"
assert list(os.listdir(tmp_dir / "dir" / "subdir")) == ["subfile"]
assert (tmp_dir / "dir" / "subdir" / "subfile").read_text() == "subfile"
assert dvc.status() == {}
if stage_md5 is not None and dir_md5 is not None:
assert (tmp_dir / "dir.dvc").read_text() == (
f"md5: {stage_md5}\n"
"frozen: true\n"
"deps:\n"
f"- md5: {dir_md5}\n"
" size: 11\n"
" nfiles: 2\n"
" hash: md5\n"
" path: remote://workspace/dir\n"
"outs:\n"
"- md5: b6dcab6ccd17ca0a8bf4a215a37d14cc.dir\n"
" size: 11\n"
" nfiles: 2\n"
" hash: md5\n"
" path: dir\n"
)
@pytest.fixture
def is_object_storage(self):
pytest.skip()
def test_import_empty_dir(self, tmp_dir, dvc, workspace, is_object_storage):
# prefix based storage services (e.g s3) doesn't have the real concept
# of directories. So instead we create an empty file that ends with a
# trailing slash in order to actually support this operation
if is_object_storage:
contents: Union[str, dict[str, str]] = ""
else:
contents = {}
workspace.gen({"empty_dir/": contents})
dvc.imp_url("remote://workspace/empty_dir/")
empty_dir = tmp_dir / "empty_dir"
assert empty_dir.is_dir()
assert tuple(empty_dir.iterdir()) == ()
class TestImportURLVersionAware:
def test_import_file(self, tmp_dir, dvc, remote_version_aware):
remote_version_aware.gen("file", "file")
dvc.imp_url("remote://upstream/file", version_aware=True)
stage = first(dvc.index.stages)
assert not stage.outs[0].can_push
assert (tmp_dir / "file").read_text() == "file"
assert dvc.status() == {}
orig_version_id = stage.deps[0].meta.version_id
orig_def_path = stage.deps[0].def_path
dvc.cache.local.clear()
remove(tmp_dir / "file")
dvc.pull()
assert (tmp_dir / "file").read_text() == "file"
(remote_version_aware / "file").write_text("modified")
assert dvc.status().get("file.dvc") == [
{"changed deps": {"remote://upstream/file": "update available"}},
{"changed outs": {"file": "not in cache"}},
]
dvc.update(str(tmp_dir / "file.dvc"))
assert (tmp_dir / "file").read_text() == "modified"
assert dvc.status() == {}
stage = first(dvc.index.stages)
assert orig_version_id != stage.deps[0].meta.version_id
assert orig_def_path == stage.deps[0].def_path
dvc.cache.local.clear()
remove(tmp_dir / "file")
dvc.pull()
assert (tmp_dir / "file").read_text() == "modified"
def test_import_dir(self, tmp_dir, dvc, remote_version_aware):
remote_version_aware.gen({"data_dir": {"subdir": {"file": "file"}}})
dvc.imp_url("remote://upstream/data_dir", version_aware=True)
stage = first(dvc.index.stages)
assert not stage.outs[0].can_push
assert (tmp_dir / "data_dir" / "subdir" / "file").read_text() == "file"
assert dvc.status() == {}
dvc.cache.local.clear()
remove(tmp_dir / "data_dir")
dvc.pull()
assert (tmp_dir / "data_dir" / "subdir" / "file").read_text() == "file"
(remote_version_aware / "data_dir" / "subdir" / "file").write_text("modified")
(remote_version_aware / "data_dir" / "new_file").write_text("new")
assert dvc.status().get("data_dir.dvc") == [
{"changed deps": {"remote://upstream/data_dir": "modified"}},
{"changed outs": {"data_dir": "not in cache"}},
]
dvc.update(str(tmp_dir / "data_dir.dvc"))
assert (tmp_dir / "data_dir" / "subdir" / "file").read_text() == "modified"
assert (tmp_dir / "data_dir" / "new_file").read_text() == "new"
assert dvc.status() == {}
dvc.cache.local.clear()
remove(tmp_dir / "data_dir")
dvc.pull()
assert (tmp_dir / "data_dir" / "subdir" / "file").read_text() == "modified"
assert (tmp_dir / "data_dir" / "new_file").read_text() == "new"
def test_import_no_download(self, tmp_dir, dvc, remote_version_aware, scm):
remote_version_aware.gen({"data_dir": {"subdir": {"file": "file"}}})
dvc.imp_url("remote://upstream/data_dir", version_aware=True, no_download=True)
scm.add(["data_dir.dvc", ".gitignore"])
scm.commit("v1")
scm.tag("v1")
stage = first(dvc.index.stages)
assert not stage.outs[0].can_push
(remote_version_aware / "data_dir" / "foo").write_text("foo")
dvc.update(no_download=True)
assert dvc.pull()["fetched"] == 2
assert (tmp_dir / "data_dir").read_text() == {
"foo": "foo",
"subdir": {"file": "file"},
}
scm.add(["data_dir.dvc", ".gitignore"])
scm.commit("update")
scm.checkout("v1")
dvc.cache.local.clear()
remove(tmp_dir / "data_dir")
assert dvc.pull()["fetched"] == 1
assert (tmp_dir / "data_dir").read_text() == {"subdir": {"file": "file"}}
dvc.commit(force=True)
assert dvc.status() == {}
def match_files(fs, entries, expected):
entries_content = {(fs.normpath(d["path"]), d["isdir"]) for d in entries}
expected_content = {(fs.normpath(d["path"]), d["isdir"]) for d in expected}
assert entries_content == expected_content
class TestLsUrl:
@pytest.mark.parametrize("fname", ["foo", "foo.dvc", "dir/foo"])
def test_file(self, cloud, fname):
cloud.gen({fname: "foo contents"})
fs, fs_path = parse_external_url(cloud.url, cloud.config)
result = ls_url(str(cloud / fname), fs_config=cloud.config)
match_files(fs, result, [{"path": fs.join(fs_path, fname), "isdir": False}])
def test_dir(self, cloud):
cloud.gen({"dir/foo": "foo contents", "dir/subdir/bar": "bar contents"})
if not (cloud / "dir").is_dir():
pytest.skip("Cannot create directories on this cloud")
fs, _ = parse_external_url(cloud.url, cloud.config)
result = ls_url(str(cloud / "dir"), fs_config=cloud.config)
match_files(
fs,
result,
[
{"path": "foo", "isdir": False},
{"path": "subdir", "isdir": True},
],
)
def test_recursive(self, cloud):
cloud.gen({"dir/foo": "foo contents", "dir/subdir/bar": "bar contents"})
if not (cloud / "dir").is_dir():
pytest.skip("Cannot create directories on this cloud")
fs, _ = parse_external_url(cloud.url, cloud.config)
result = ls_url(str(cloud / "dir"), fs_config=cloud.config, recursive=True)
match_files(
fs,
result,
[
{"path": "foo", "isdir": False},
{"path": "subdir/bar", "isdir": False},
],
)
def test_nonexistent(self, cloud):
with pytest.raises(URLMissingError):
ls_url(str(cloud / "dir"), fs_config=cloud.config)
class TestGetUrl:
def test_get_file(self, cloud, tmp_dir):
cloud.gen({"foo": "foo contents"})
Repo.get_url(str(cloud / "foo"), "foo_imported", fs_config=cloud.config)
assert (tmp_dir / "foo_imported").is_file()
assert (tmp_dir / "foo_imported").read_text() == "foo contents"
def test_get_dir(self, cloud, tmp_dir):
cloud.gen({"foo": {"foo": "foo contents"}})
if not (cloud / "foo").is_dir():
pytest.skip("Cannot create directories on this cloud")
Repo.get_url(str(cloud / "foo"), "foo_imported", fs_config=cloud.config)
assert (tmp_dir / "foo_imported").is_dir()
assert (tmp_dir / "foo_imported" / "foo").is_file()
assert (tmp_dir / "foo_imported" / "foo").read_text() == "foo contents"
@pytest.mark.parametrize("dname", [".", "dir", "dir/subdir"])
def test_get_url_to_dir(self, cloud, tmp_dir, dname):
cloud.gen({"src": {"foo": "foo contents"}})
if not (cloud / "src").is_dir():
pytest.skip("Cannot create directories on this cloud")
tmp_dir.gen({"dir": {"subdir": {}}})
Repo.get_url(str(cloud / "src" / "foo"), dname, fs_config=cloud.config)
assert (tmp_dir / dname).is_dir()
assert (tmp_dir / dname / "foo").read_text() == "foo contents"
def test_get_url_nonexistent(self, cloud):
with pytest.raises(URLMissingError):
Repo.get_url(str(cloud / "nonexistent"), fs_config=cloud.config)
class TestToRemote:
def test_add_to_remote(self, tmp_dir, dvc, remote, workspace):
workspace.gen("foo", "foo")
url = "remote://workspace/foo"
[stage] = dvc.add(url, to_remote=True)
assert not (tmp_dir / "foo").exists()
assert (tmp_dir / "foo.dvc").exists()
assert len(stage.deps) == 0
assert len(stage.outs) == 1
hash_info = stage.outs[0].hash_info
meta = stage.outs[0].meta
assert hash_info.name == "md5"
assert hash_info.value == "acbd18db4cc2f85cedef654fccc4a4d8"
assert (
remote / "files" / "md5" / "ac" / "bd18db4cc2f85cedef654fccc4a4d8"
).read_text() == "foo"
assert meta.size == len("foo")
def test_import_url_to_remote_file(self, tmp_dir, dvc, workspace, remote):
workspace.gen("foo", "foo")
url = "remote://workspace/foo"
stage = dvc.imp_url(url, to_remote=True)
assert stage.deps[0].hash_info.value is not None
assert not (tmp_dir / "foo").exists()
assert (tmp_dir / "foo.dvc").exists()
assert len(stage.deps) == 1
assert stage.deps[0].def_path == url
assert len(stage.outs) == 1
hash_info = stage.outs[0].hash_info
assert hash_info.name == "md5"
assert hash_info.value == "acbd18db4cc2f85cedef654fccc4a4d8"
assert (
remote / "files" / "md5" / "ac" / "bd18db4cc2f85cedef654fccc4a4d8"
).read_text() == "foo"
assert stage.outs[0].meta.size == len("foo")
def test_import_url_to_remote_dir(self, tmp_dir, dvc, workspace, remote):
import json
workspace.gen(
{
"data": {
"foo": "foo",
"bar": "bar",
"sub_dir": {"baz": "sub_dir/baz"},
}
}
)
url = "remote://workspace/data"
stage = dvc.imp_url(url, to_remote=True)
assert not (tmp_dir / "data").exists()
assert (tmp_dir / "data.dvc").exists()
assert len(stage.deps) == 1
assert stage.deps[0].def_path == url
assert len(stage.outs) == 1
hash_info = stage.outs[0].hash_info
assert hash_info.name == "md5"
assert hash_info.value == "55d05978954d1b2cd7b06aedda9b9e43.dir"
file_parts = json.loads(
(
remote / "files" / "md5" / "55" / "d05978954d1b2cd7b06aedda9b9e43.dir"
).read_text()
)
assert len(file_parts) == 3
assert {file_part["relpath"] for file_part in file_parts} == {
"foo",
"bar",
"sub_dir/baz",
}
for file_part in file_parts:
md5 = file_part["md5"]
assert (
remote / "files" / "md5" / md5[:2] / md5[2:]
).read_text() == file_part["relpath"]