getindata/data-pipelines-cli

View on GitHub
data_pipelines_cli/filesystem_utils.py

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
from __future__ import annotations

import os
import pathlib
from typing import Dict, Set, Union

import fsspec
from fsspec import AbstractFileSystem

from .cli_utils import echo_subinfo
from .errors import DataPipelinesError


class LocalRemoteSync:
    """Synchronizes local directory with a cloud storage's one."""

    local_fs: AbstractFileSystem
    """FS representing local directory"""
    local_path_str: str
    """Path to local directory"""
    remote_path_str: str
    """Path/URI of the cloud storage directory"""
    _local_directory_suffixes: Set[str]

    def __init__(
        self,
        local_path: Union[str, os.PathLike[str]],
        remote_path: str,
        remote_kwargs: Dict[str, str],
    ) -> None:
        if not pathlib.Path(local_path).exists():
            raise DataPipelinesError(f"{local_path} does not exists. Run 'dp compile' before.")

        self.local_path_str = str(local_path).rstrip("/")
        self.local_fs = fsspec.filesystem("file")
        self.remote_fs, self.remote_path_str = fsspec.core.url_to_fs(
            remote_path.rstrip("/"), **remote_kwargs
        )
        self._local_directory_suffixes = set()

    def sync(self, delete: bool = True) -> None:
        """
        Send local files to the remote directory and (optionally) delete
        unnecessary ones.

        :param delete: Whether to delete remote files that are \
        no longer present in local directory
        :type delete: bool
        """
        self._push_sync()
        if delete:
            self._delete()

    def _push_sync(self) -> None:
        """Push every file to the remote."""

        # TODO: Is it "lazy" (checking what to update) or not?
        local_directory = self.local_fs.find(self.local_path_str)
        self._local_directory_suffixes = set()
        for local_file in local_directory:
            local_file_suffix = local_file[len(self.local_path_str) :]
            self._local_directory_suffixes.add(local_file_suffix)
            remote_path_with_suffix = self.remote_path_str + local_file_suffix
            echo_subinfo(f"- Pushing {str(local_file)} to {remote_path_with_suffix}")
            self.remote_fs.put_file(local_file, remote_path_with_suffix)

    def _delete(self) -> None:
        """Remove every file from remote that's not local."""
        remote_directory = self.remote_fs.find(self.remote_path_str)
        for remote_file in remote_directory:
            remote_file_suffix = remote_file[len(self.remote_path_str) :]
            if remote_file_suffix not in self._local_directory_suffixes:
                self.remote_fs.rm(remote_file)