borgbackup/borg

View on GitHub
src/borg/testsuite/archiver/serve_cmd.py

Summary

Maintainability
A
0 mins
Test Coverage
import os
import subprocess
import tempfile
import time

import pytest
import platformdirs

from . import exec_cmd
from ...platformflags import is_win32
from ...helpers import get_runtime_dir


def have_a_short_runtime_dir(mp):
    # under pytest, we use BORG_BASE_DIR to keep stuff away from the user's normal borg dirs.
    # this leads to a very long get_runtime_dir() path - too long for a socket file!
    # thus, we override that again via BORG_RUNTIME_DIR to get a shorter path.
    mp.setenv("BORG_RUNTIME_DIR", os.path.join(platformdirs.user_runtime_dir(), "pytest"))


@pytest.fixture
def serve_socket(monkeypatch):
    have_a_short_runtime_dir(monkeypatch)
    # use a random unique socket filename, so tests can run in parallel.
    socket_file = tempfile.mktemp(suffix=".sock", prefix="borg-", dir=get_runtime_dir())
    with subprocess.Popen(["borg", "serve", f"--socket={socket_file}"]) as p:
        while not os.path.exists(socket_file):
            time.sleep(0.01)  # wait until socket server has started
        yield socket_file
        p.terminate()


@pytest.mark.skipif(is_win32, reason="hangs on win32")
def test_with_socket(serve_socket, tmpdir, monkeypatch):
    have_a_short_runtime_dir(monkeypatch)
    repo_path = str(tmpdir.join("repo"))

    ret, output = exec_cmd(f"--socket={serve_socket}", f"--repo=socket://{repo_path}", "rcreate", "--encryption=none")
    assert ret == 0

    ret, output = exec_cmd(f"--socket={serve_socket}", f"--repo=socket://{repo_path}", "rinfo")
    assert ret == 0
    assert "Repository ID: " in output

    monkeypatch.setenv("BORG_DELETE_I_KNOW_WHAT_I_AM_DOING", "YES")
    ret, output = exec_cmd(f"--socket={serve_socket}", f"--repo=socket://{repo_path}", "rdelete")
    assert ret == 0


@pytest.mark.skipif(is_win32, reason="hangs on win32")
def test_socket_permissions(serve_socket):
    st = os.stat(serve_socket)
    assert st.st_mode & 0o0777 == 0o0770  # user and group are permitted to use the socket