giganticode/codeprep

View on GitHub
codeprep/util/dir.py

Summary

Maintainability
C
1 day
Test Coverage
# SPDX-FileCopyrightText: 2020 Hlib Babii <hlibbabii@gmail.com>
#
# SPDX-License-Identifier: Apache-2.0

import logging
import os
from datetime import datetime

from typing import Optional, List, Generator

from codeprep.config import LIMIT_FILES_ON_LAST_MODIFICATION_CHECK
from codeprep.util.file import has_one_of_extensions

logger = logging.getLogger(__name__)


def walk(path:bytes, extension: Optional[bytes] = None) -> Generator[bytes, None, None]:
    if os.path.isfile(path) and (not extension or path.endswith(extension)):
        yield path
    else:
        for root, dirs, files in os.walk(path):
            for file in files:
                if not extension or file.endswith(extension):
                    yield os.path.join(root, file)


def walk_and_save(path: str, dir_list_path: str, file_list_path: str, return_dirs_instead_of_regular_files: bool,
                  extensions: Optional[List[str]]) -> Generator[bytes, None, None]:
    with open(dir_list_path, 'w') as d, open(file_list_path, 'w') as f:
        path_bin = path.encode()
        extensions_bin = list(map(lambda e: e.encode(), extensions)) if extensions else None
        # we want to list and store all the files a sequences of bytes to avoid problems with different encodings for filenames
        if os.path.isfile(path_bin):
            res = os.path.basename(path_bin)
            f.write(f'{res}\n')
            if not return_dirs_instead_of_regular_files:
                yield res
        else:
            for root, dirs, files in os.walk(path_bin, followlinks=True):
                # we pass bytes to os.walk -> the output are bytes as well
                for dir in dirs:
                    bin_name = os.path.join(os.path.relpath(root, path_bin), dir)
                    d.write(f'{bin_name}\n')
                    if return_dirs_instead_of_regular_files:
                        yield bin_name
                for file in files:
                    bin_name = os.path.join(os.path.relpath(root, path_bin), file)
                    if not extensions or has_one_of_extensions(bin_name, extensions_bin):
                        if not os.path.islink(os.path.join(root, file)):
                            f.write(f'{bin_name}\n')
                            if not return_dirs_instead_of_regular_files:
                                yield bin_name


def get_dir_last_modification(path: str, limit: int = LIMIT_FILES_ON_LAST_MODIFICATION_CHECK) -> datetime:

    def walk_path(path):
        counter = 0
        if os.path.isfile(path) or len(os.listdir(path)) == 0:
            yield os.path.getmtime(path)
        else:
            for root, dirs, files in os.walk(path):
                for dir in dirs:
                    if counter >= limit:
                        return
                    counter += 1
                    yield os.path.getmtime(os.path.join(root, dir))
                for file in files:
                    if counter >= limit:
                        return
                    full_path = os.path.join(root, file)
                    if not os.path.islink(full_path):
                        counter += 1
                        yield os.path.getmtime(full_path)

    mtime = max(walk_path(path))
    return datetime.fromtimestamp(mtime)


def get_timestamp(path: str) -> str:
    last_modif_time = get_dir_last_modification(path)
    return last_modif_time.strftime("%y-%m-%dT%H-%M-%S")