dmyersturnbull/pocketutils

View on GitHub
src/pocketutils/tools/filesys_tools.py

Summary

Maintainability
C
1 day
Test Coverage
# SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to pocketutils
# SPDX-PackageHomePage: https://github.com/dmyersturnbull/pocketutils
# SPDX-License-Identifier: Apache-2.0
"""

"""

import logging
import os
import pathlib
import shutil
import stat
import tempfile
from collections.abc import Generator, Mapping
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path, PurePath
from typing import Any, Self, Unpack

from pocketutils.core.exceptions import PathMissingError, ReadFailedError, WriteFailedError
from pocketutils.core.input_output import Writeable

__all__ = ["FilesysUtils", "FilesysTools", "PathInfo"]

logger = logging.getLogger("pocketutils")


@dataclass(frozen=True, slots=True, kw_only=True)
class PathInfo:
    """
    Info about an extant or nonexistent path as it was at some time.
    Use this to avoid making repeated filesystem calls (e.g. `.is_dir()`):
    None of the properties defined here make OS calls.

    Attributes:
        source: The original path used for lookup; may be a symlink
        resolved: The fully resolved path, or None if it does not exist
        as_of: A datetime immediately before the system calls (system timezone)
        real_stat: `os.stat_result`, or None if the path does not exist
        link_stat: `os.stat_result`, or None if the path is not a symlink
        has_access: Path exists and has the 'a' flag set
        has_read: Path exists and has the 'r' flag set
        has_write: Path exists and has the 'w' flag set

    All the additional properties refer to the resolved path,
    except for [`is_symlink`](pocketutils.tools.filesys_tools.PathInfo.is_symlink),
    [`is_valid_symlink`](pocketutils.tools.filesys_tools.PathInfo.is_valid_symlink),
    and [`is_broken_symlink`](pocketutils.tools.filesys_tools.PathInfo.is_broken_symlink).
    """

    source: Path
    resolved: Path | None
    as_of: datetime
    real_stat: os.stat_result | None
    link_stat: os.stat_result | None
    has_access: bool
    has_read: bool
    has_write: bool

    @property
    def mod_or_create_dt(self: Self) -> datetime | None:
        """
        Returns the modification or access datetime.
        Uses whichever is available: creation on Windows and modification on Unix-like.
        """
        if os.name == "nt":
            return self._get_dt("st_ctime")
        # will work on posix; on java try anyway
        return self._get_dt("st_mtime")

    @property
    def mod_dt(self: Self) -> datetime | None:
        """
        Returns the modification datetime, if known.
        Returns None on Windows or if the path does not exist.
        """
        if os.name == "nt":
            return None
        return self._get_dt("st_mtime")

    @property
    def create_dt(self: Self) -> datetime | None:
        """
        Returns the creation datetime, if known.
        Returns None on Unix-like systems or if the path does not exist.
        """
        if os.name == "posix":
            return None
        return self._get_dt("st_ctime")

    @property
    def access_dt(self: Self) -> datetime | None:
        """
        Returns the access datetime.
        *Should* never return None if the path exists, but not guaranteed.
        """
        return self._get_dt("st_atime")

    @property
    def exists(self: Self) -> bool:
        """
        Returns whether the resolved path exists.
        """
        return self.real_stat is not None

    @property
    def is_file(self: Self) -> bool:
        return self.exists and stat.S_ISREG(self.real_stat.st_mode)

    @property
    def is_dir(self: Self) -> bool:
        return self.exists and stat.S_ISDIR(self.real_stat.st_mode)

    @property
    def is_readable_dir(self: Self) -> bool:
        return self.is_file and self.has_access and self.has_read

    @property
    def is_writeable_dir(self: Self) -> bool:
        return self.is_dir and self.has_access and self.has_write

    @property
    def is_readable_file(self: Self) -> bool:
        return self.is_file and self.has_access and self.has_read

    @property
    def is_writeable_file(self: Self) -> bool:
        return self.is_file and self.has_access and self.has_write

    @property
    def is_block_device(self: Self) -> bool:
        return self.exists and stat.S_ISBLK(self.real_stat.st_mode)

    @property
    def is_char_device(self: Self) -> bool:
        return self.exists and stat.S_ISCHR(self.real_stat.st_mode)

    @property
    def is_socket(self: Self) -> bool:
        return self.exists and stat.S_ISSOCK(self.real_stat.st_mode)

    @property
    def is_fifo(self: Self) -> bool:
        return self.exists and stat.S_ISFIFO(self.real_stat.st_mode)

    @property
    def is_symlink(self: Self) -> bool:
        return self.link_stat is not None

    @property
    def is_valid_symlink(self: Self) -> bool:
        return self.is_symlink and self.exists

    @property
    def is_broken_symlink(self: Self) -> bool:
        return self.is_symlink and not self.exists

    def _get_dt(self: Self, attr: str) -> datetime | None:
        if self.real_stat is None:
            return None
        sec = getattr(self.real_stat, attr)
        return datetime.fromtimestamp(sec).astimezone()


@dataclass(slots=True, frozen=True)
class FilesysUtils:
    """
    Tools for file/directory creation, etc.

    Warning:
        Some functions may be insecure.
    """

    @classmethod
    def verify_can_read_files(
        cls: type[Self],
        *paths: str | Path,
        missing_ok: bool = False,
        attempt: bool = False,
    ) -> None:
        """
        Checks that all files can be written to, to ensure atomicity before operations.

        Args:
            *paths: The files
            missing_ok: Don't raise an error if a path doesn't exist
            attempt: Actually try opening

        Returns:
            ReadFailedError: If a path is not a file (modulo existence) or doesn't have 'W' set
        """
        paths = [Path(p) for p in paths]
        for path in paths:
            if path.exists() and not path.is_file():
                raise ReadFailedError(f"Path {path} is not a file", filename=str(path))
            if (not missing_ok or path.exists()) and not os.access(path, os.R_OK):
                raise ReadFailedError(f"Cannot read from {path}", filename=str(path))
            if attempt:
                try:
                    with open(path):
                        pass
                except OSError:
                    raise WriteFailedError(f"Failed to open {path} for read", filename=str(path))

    @classmethod
    def verify_can_write_files(
        cls: type[Self],
        *paths: str | Path,
        missing_ok: bool = False,
        attempt: bool = False,
    ) -> None:
        """
        Checks that all files can be written to, to ensure atomicity before operations.

        Args:
            *paths: The files
            missing_ok: Don't raise an error if a path doesn't exist
            attempt: Actually try opening

        Returns:
            WriteFailedError: If a path is not a file (modulo existence) or doesn't have 'W' set
        """
        paths = [Path(p) for p in paths]
        for path in paths:
            if path.exists() and not path.is_file():
                raise WriteFailedError(f"Path {path} is not a file", filename=str(path))
            if (not missing_ok or path.exists()) and not os.access(path, os.W_OK):
                raise WriteFailedError(f"Cannot write to {path}", filename=str(path))
            if attempt:
                try:
                    with open(path, "a"):  # or w
                        pass
                except OSError:
                    raise WriteFailedError(f"Failed to open {path} for write", filename=str(path))

    @classmethod
    def verify_can_write_dirs(
        cls: type[Self],
        *paths: str | PurePath,
        missing_ok: bool = False,
    ) -> None:
        """
        Checks that all directories can be written to, to ensure atomicity before operations.

        Args:
            *paths: The directories
            missing_ok: Don't raise an error if a path doesn't exist

        Returns:
            WriteFailedError: If a path is not a directory (modulo existence) or doesn't have 'W' set
        """
        paths = [Path(p) for p in paths]
        for path in paths:
            if path.exists() and not path.is_dir():
                raise WriteFailedError(f"Path {path} is not a dir", filename=str(path))
            if missing_ok and not path.exists():
                continue
            if not os.access(path, os.W_OK):
                raise WriteFailedError(f"{path} lacks write permission", filename=str(path))
            if not os.access(path, os.X_OK):
                raise WriteFailedError(f"{path} lacks access permission", filename=str(path))

    def get_info(self: Self, path: PurePath | str, *, expand_user: bool = False, strict: bool = False) -> PathInfo:
        path = Path(path)
        has_ignore_error = hasattr(pathlib, "_ignore_error")
        if not has_ignore_error:
            logger.debug("No _ignore_error found; some OSErrors may be suppressed")
        resolved = None
        real_stat = None
        has_access = False
        has_read = False
        has_write = False
        link_stat = None
        as_of = datetime.now(tz=UTC).astimezone()
        if has_ignore_error or path.is_symlink() or path.exists():
            link_stat = self.__stat_raw(path)
        if link_stat is not None:
            resolved = path.expanduser().resolve(strict=strict) if expand_user else path.resolve(strict=strict)
            real_stat = self.__stat_raw(resolved) if stat.S_ISLNK(link_stat.st_mode) else link_stat
            has_access = os.access(path, os.X_OK, follow_symlinks=True)
            has_read = os.access(path, os.R_OK, follow_symlinks=True)
            has_write = os.access(path, os.W_OK, follow_symlinks=True)
            if not stat.S_ISLNK(link_stat.st_mode):
                link_stat = None
        return PathInfo(
            source=path,
            resolved=resolved,
            as_of=as_of,
            real_stat=real_stat,
            link_stat=link_stat,
            has_access=has_access,
            has_read=has_read,
            has_write=has_write,
        )

    def prep_dir(self: Self, path: PurePath | str, *, exist_ok: bool = True) -> bool:
        """
        Prepares a directory by making it if it doesn't exist.
        If `exist_ok` is False, calls `logger.warning` if `path` already exists
        """
        path = Path(path)
        exists = path.exists()
        # On some platforms we get generic exceptions like permissions errors,
        # so these are better
        if exists and not path.is_dir():
            raise PathMissingError(filename=str(path))
        if exists and not exist_ok:
            logger.warning(f"Directory {path} already exists")
        if not exists:
            # NOTE! exist_ok in mkdir throws an error on Windows
            path.mkdir(parents=True)
        return exists

    def prep_file(self: Self, path: PurePath | str, *, exist_ok: bool = True) -> None:
        """
        Prepares a file path by making its parent directory.
        Same as `pathlib.Path.mkdir` but makes sure `path` is a file if it exists.
        """
        # On some platforms we get generic exceptions like permissions errors, so these are better
        path = Path(path)
        # check for errors first; don't make the dirs and then fail
        if path.exists() and not path.is_file() and not path.is_symlink():
            raise PathMissingError(filename=str(path))
        Path(path.parent).mkdir(parents=True, exist_ok=exist_ok)

    def delete_surefire(self: Self, path: PurePath | str) -> Exception | None:
        """
        Deletes files or directories cross-platform, but working around multiple issues in Windows.

        Returns:
            None, or an Exception for minor warnings

        Raises:
            IOError: If it can't delete
        """
        # we need this because of Windows
        path = Path(path)
        logger.debug(f"Permanently deleting {path} ...")
        chmod_err = None
        try:
            os.chmod(str(path), stat.S_IRWXU)
        except Exception as e:
            chmod_err = e
        # another reason for returning exception:
        # We don't want to interrupt the current line being printed like in slow_delete
        if path.is_dir():
            shutil.rmtree(str(path), ignore_errors=True)  # ignore_errors because of Windows
            try:
                path.unlink(missing_ok=True)  # again, because of Windows
            except OSError:  # noqa: S110
                pass  # almost definitely because it doesn't exist
        else:
            path.unlink(missing_ok=True)
        logger.debug(f"Permanently deleted {path}")
        return chmod_err

    def trash(self: Self, path: PurePath | str, trash_dir: PurePath | str) -> None:
        """
        Trash a file or directory.

        Args:
            path: The path to move to the trash
            trash_dir: If None, uses
            [`guess_trash`](pocketutils.tools.path_tools.PathTools.guess_trash).
        """
        logger.debug(f"Trashing {path} to {trash_dir} ...")
        shutil.move(str(path), str(trash_dir))
        logger.debug(f"Trashed {path} to {trash_dir}")

    def try_delete(self: Self, path: Path, *, bound: type[Exception] = PermissionError) -> None:
        """
        Try to delete a file (probably temp file), if it exists, and log any `PermissionError`.
        """
        path = Path(path)
        # noinspection PyBroadException
        try:
            path.unlink(missing_ok=True)
        except bound:
            logger.error(f"Permission error preventing deleting {path}")

    def temp_path(self: Self, path: PurePath | str | None = None, **kwargs) -> Generator[Path, None, None]:
        """
        Makes a temporary Path. Won't create `path` but will delete it at the end.
        If `path` is None, will use `tempfile.mkstemp`.
        """
        if path is None:
            _, path = tempfile.mkstemp()
        try:
            yield Path(path, **kwargs)
        finally:
            Path(path).unlink()

    def temp_file(
        self: Self,
        path: PurePath | str | None = None,
        *,
        spooled: bool = False,
        **kwargs: Unpack[Mapping[str, Any]],
    ) -> Generator[Writeable, None, None]:
        """
        Simple wrapper around `tempfile` functions.
        Wraps `TemporaryFile`, `NamedTemporaryFile`, and `SpooledTemporaryFile`.
        """
        if spooled:
            with tempfile.SpooledTemporaryFile(**kwargs) as x:
                yield x
        elif path is None:
            with tempfile.TemporaryFile(**kwargs) as x:
                yield x
        else:
            with tempfile.NamedTemporaryFile(str(path), **kwargs) as x:
                yield x

    def temp_dir(self: Self, **kwargs: Unpack[Mapping[str, Any]]) -> Generator[Path, None, None]:
        with tempfile.TemporaryDirectory(**kwargs) as x:
            yield Path(x)

    def __stat_raw(self: Self, path: Path) -> os.stat_result | None:
        try:
            return path.lstat()
        except OSError as e:
            if hasattr(pathlib, "_ignore_error") and not pathlib._ignore_error(e):
                raise e
        return None


FilesysTools = FilesysUtils()