JrGoodle/clowder

View on GitHub
clowder/util/filesystem.py

Summary

Maintainability
A
0 mins
Test Coverage
"""File system utilities

.. codeauthor:: Joe DeCapo <joe@polka.cat>

"""

# import errno
import fnmatch
import os
import re
import shutil
from pathlib import Path
from typing import List

import clowder.util.command as cmd


def list_subdirectories(path: Path, recursive: bool = False) -> List[Path]:
    if recursive:
        return [Path(info[0]) for info in os.walk(path)]
    else:
        paths = [Path(str(p)) for p in os.scandir(path)]
        return [f for f in paths if f.is_dir()]


def find_files_with_extension(directory: Path, extension: str) -> List[Path]:
    all_files = []
    for root, _, files in os.walk(directory):
        matching_files = [Path(root, f) for f in files if f.lower().endswith(f'.{extension.lower()}')]
        all_files += matching_files
    return all_files


def find_files(directory: Path, name: str) -> List[Path]:
    all_files = []
    for root, dirs, files in os.walk(directory):
        matching_files = [Path(root, f) for f in files if f.lower() == name.lower()]
        matching_dirs = [Path(root, d) for d in dirs if d.lower() == name.lower()]
        all_files += (matching_files + matching_dirs)
    return all_files


def find_files_containing_match(directory: Path, match: str) -> List[Path]:
    all_files = []
    for root, dirs, files in os.walk(directory):
        matching_files = [Path(root, f) for f in files if match.lower() in f.lower()]
        matching_dirs = [Path(root, d) for d in dirs if match.lower() in d.lower()]
        all_files += (matching_files + matching_dirs)
    return all_files


def find_rars(directory: Path, match_all: bool = False) -> List[Path]:
    all_rar_files = []
    for root, dirs, files in os.walk(directory):
        rar_files = [Path(root, f) for f in files if f.endswith('.rar')]
        if match_all:
            all_rar_files += rar_files
            continue
        tmp_files = []
        for file in rar_files:
            if file.name[:-4] in dirs:
                continue
            tmp_files.append(file)
            r = re.compile(r"^.+[.]part[0-9][0-9][0-9]?[.]rar$")
            if all(r.match(f.name) for f in rar_files):
                break
        all_rar_files += tmp_files
    return all_rar_files


def make_dir(dir_path: Path, exist_ok: bool = False) -> Path:
    os.makedirs(dir_path, exist_ok=exist_ok)
    return dir_path


def move(input_path: Path, output_path: Path) -> None:
    shutil.move(str(input_path), str(output_path))


def remove_dir(dir_path: Path, ignore_errors: bool = False) -> None:
    shutil.rmtree(str(dir_path), ignore_errors=ignore_errors)


def remove_file(file: Path) -> None:
    os.remove(str(file))


def remove(path: Path) -> None:
    if path.is_symlink():
        path.unlink()
    elif path.is_dir():
        remove_dir(path)
    elif path.is_file():
        remove_file(path)


def is_relative_to(path: Path, prefix: Path) -> bool:
    return str(path).startswith(str(prefix))


def replace_path_prefix(path: Path, old_prefix: Path, new_prefix: Path):
    # assert path.is_absolute()
    # assert path.is_relative_to(old_prefix)
    relative_path = path.relative_to(old_prefix)
    return new_prefix / relative_path


def listdir(directory: Path) -> List[Path]:
    files = os.listdir(directory)
    return [directory / f for f in files]


def listdir_matching(directory: Path, pattern: str) -> List[Path]:
    files = os.listdir(directory)
    matches = fnmatch.filter(files, pattern)
    return [directory / m for m in matches]


def unar(file: Path) -> None:
    # escaped_file_name = str(file).replace("'", r"\'")
    cmd.run(f'unar "{file}"', cwd=file.parent)


def create_backup_file(file: Path) -> None:
    """Copy file to {file}.backup

    :param Path file: File path to copy
    """

    shutil.copyfile(str(file), f"{str(file)}.backup")


def restore_from_backup_file(file: Path) -> None:
    """Copy {file}.backup to file

    :param Path file: File path to copy
    """

    shutil.copyfile(f"{file}.backup", file)


# def make_dir(directory: Path, check: bool = True) -> None:
#     """Make directory if it doesn't exist
#
#     :param str directory: Directory path to create
#     :param bool check: Whether to raise exceptions
#     """
#
#     if directory.exists():
#         return
#
#     try:
#         os.makedirs(str(directory))
#     except OSError as err:
#         if err.errno == errno.EEXIST:
#             LOG.error(f"Directory already exists at {Format.path(directory)}")
#         else:
#             LOG.error(f"Failed to create directory {Format.path(directory)}")
#         if check:
#             raise


# def remove_directory(dir_path: Path, check: bool = True) -> None:
#     """Remove directory at path
#
#     :param str dir_path: Path to directory to remove
#     :param bool check: Whether to raise errors
#     """
#
#     try:
#         shutil.rmtree(dir_path)
#     except shutil.Error:
#         LOG.error(f"Failed to remove directory {Format.path(dir_path)}")
#         if check:
#             raise


def has_contents(path: Path) -> bool:
    return not is_empty_dir(path)


def is_empty_dir(path: Path) -> bool:
    if not path.exists() or not path.is_dir():
        raise Exception(f"Directory at {path} doesn't exist")
    if not os.listdir(path):
        return True
    else:
        return False


def create_file(path: Path, contents: str) -> None:
    with path.open('w') as f:
        f.write(contents)
    assert path.is_file()
    assert not path.is_symlink()
    assert path.read_text().strip() == contents.strip()


def symlink_to(path: Path, target: Path) -> None:
    parent = path.parent
    fd = os.open(parent, os.O_DIRECTORY)
    os.symlink(target, path, dir_fd=fd)
    os.close(fd)
    assert path.exists()
    assert path.is_symlink()
    assert is_relative_symlink_from_to(path, str(target))


def symlink_relative_to(source: Path, target: Path, relative_to: Path) -> None:
    """Create relative symlink

    :param Path source: File to create symlink pointing to
    :param Path target: Symlink location
    :param Path relative_to: Directory source is relative to
    :raise ExistingFileError:
    :raise MissingSourceError:
    """

    source = source.relative_to(relative_to)
    try:
        path = target.parent
        fd = os.open(path, os.O_DIRECTORY)
        os.symlink(source, target, dir_fd=fd)
        os.close(fd)
    except OSError:
        # LOG.error(f"Failed to symlink file {Format.path(target)} -> {Format.path(source)}")
        raise


def copy_file(path: Path, destination: Path) -> None:
    shutil.copyfile(path, destination)
    assert destination.is_file()
    assert not destination.is_symlink()


def is_relative_symlink_from_to(symlink: Path, destination: str) -> bool:
    if not symlink.is_symlink():
        return False
    path = symlink.parent
    resolved_symlink = symlink.resolve()
    if not resolved_symlink.samefile(path / destination):
        return False
    link = os.readlink(symlink)
    is_relative = not Path(link).is_absolute()
    return is_relative


def copy_directory(from_dir: Path, to_path: Path):
    # TODO: Replace rmdir() with copytree(dirs_exist_ok=True) when support for Python 3.7 is dropped
    to_path.rmdir()
    shutil.copytree(from_dir, to_path, symlinks=True)


def copy_file(from_path: Path, to_path: Path):
    shutil.copyfile(from_path, to_path)


def copy(from_path: Path, to_path: Path):
    if from_path.is_dir():
        copy_directory(from_path, to_path)
    else:
        copy_file(from_path, to_path)