kyle0x54/medvision

View on GitHub
medvision/util/fileutil.py

Summary

Maintainability
A
35 mins
Test Coverage
import collections
from enum import Enum, unique, auto
import hashlib
import os
from pathlib import Path
import shutil
from natsort import natsorted
import medvision as mv


isdir = os.path.isdir
isfile = os.path.isfile


def listdir(path):
    return natsorted(os.listdir(path))


joinpath = os.path.join
basename = os.path.basename
abspath = os.path.abspath
splitext = os.path.splitext


def parentdir(path):
    path = abspath(path)
    return os.path.dirname(path)


def filetitle(path):
    return os.path.splitext(os.path.basename(path))[0]


def change_suffix(path, new_suffix):
    return mv.splitext(path)[0] + new_suffix


cp = shutil.copy
rm = os.remove
cptree = shutil.copytree
rmtree = shutil.rmtree
move = shutil.move


def symlink(src, dst, overwrite=True, **kwargs):
    if os.path.lexists(dst) and overwrite:
        os.remove(dst)
    os.symlink(src, dst, **kwargs)


def mkdirs(path, mode=0o777):
    path = os.path.expanduser(path)
    path = abspath(path)
    os.makedirs(path, mode, exist_ok=True)


def empty_dir(path):
    assert isdir(path)
    rmtree(path)
    mkdirs(path)


def non_overwrite_cp(src, dst):
    if isfile(dst):
        raise FileExistsError('target file {} already exists'.format(dst))

    if isdir(dst):
        filename = basename(src)
        dst_filepath = joinpath(dst, filename)
        if isfile(dst_filepath):
            raise FileExistsError(
                'target file {} already exists'.format(dst_filepath))

    return cp(src, dst)


def copyfiles(src_paths, dst_dir, src_root=None, non_overwrite=False):
    assert isdir(dst_dir)
    assert mv.isarrayinstance(src_paths)

    cp_func = non_overwrite_cp if non_overwrite else cp
    if src_root is not None:
        for src_path in src_paths:
            cp_func(joinpath(src_root, src_path), dst_dir)
    else:
        for src_path in src_paths:
            cp_func(src_path, dst_dir)


@unique
class GlobMode(Enum):
    FILE = auto()
    DIR = auto()
    ALL = auto()


def glob(root, pattern='*', mode=GlobMode.FILE, recursive=False):
    root = os.path.expanduser(root)
    root = os.path.abspath(root)
    root = Path(root)
    paths = root.rglob(pattern) if recursive else root.glob(pattern)
    paths = [str(entry) for entry in paths]

    if mode == GlobMode.FILE:
        paths = filter(isfile, paths)
    elif mode == GlobMode.DIR:
        paths = filter(isdir, paths)
    else:  # GlobMode.ALL
        pass

    return natsorted(paths)


def compute_md5_str(file_path):
    if not mv.isfile(file_path):
        return None

    with open(file_path, 'rb') as f:
        m = hashlib.md5()
        m.update(f.read())
        md5_code = m.hexdigest()
        return str(md5_code).lower()


def find_duplicated_files(data_dir, pattern='*'):
    """ Find duplicated files in specified directory.

    Args:
        data_dir (str): specified directory to be scanned.
        pattern: refer to 'glob()'.

    Return:
        (list[tuple]): duplicated file path pairs.
    """
    filepaths = glob(data_dir, pattern, mode=GlobMode.FILE, recursive=True)
    md5s = [compute_md5_str(filepath) for filepath in filepaths]
    md5_counts = collections.Counter(md5s)

    duplicated_files = []
    for key, count in md5_counts.items():
        if count > 1:
            candidates = tuple(filepaths[i] for
                               i, x in enumerate(md5s) if x == key)
            duplicated_files.append(candidates)

    return duplicated_files