View on GitHub


0 mins
Test Coverage
"""Utilities functions."""
import json
import logging
import verboselogs
import os
import sys
import zipfile
from re import finditer

import colorama
import coloredlogs
import cv2
import imageio
import numpy as np
import requests
from PIL import Image

from config import Config as Conf

def read_image(path):
    Read a file image.

    :param path: <string> Path of the image
    :return: <RGB> image
    # Read image
    with open(path, "rb") as file:
        image_bytes = bytearray(
        np_image = np.asarray(image_bytes, dtype=np.uint8)
        image = cv2.imdecode(np_image, cv2.IMREAD_COLOR)
    # See if image loaded correctly
    if image is None:
        Conf.log.error("{} file is not valid image".format(path))
    return image

def write_image(image, path):
    Write a file image to the path (create the directory if needed).

    :param image: <RGB> image to write
    :param path: <string> location where write the image
    :return: None
    dir_path = os.path.dirname(path)
    if dir_path != '':
        os.makedirs(dir_path, exist_ok=True)

    if os.path.splitext(path)[1] not in cv2_supported_extension():
        Conf.log.error("{} invalid extension format.".format(path))

    cv2.imwrite(path, image)

    if not check_image_file_validity(path):
            "Something gone wrong writing {} image file. The final result is not a valid image file.".format(path))

def check_shape(path, shape=None):
    Validate the shape of an image.

    :param image: <RGB> Image to check
    :param shape: <(int,int,int)> Valid shape
    :return: None
    if is_a_supported_animated_file_extension(path):
        #img_shape = imageio.mimread(path)[0][:, :, :3].shape

    if shape is None:
      shape = Conf.desired_shape

    img_shape = read_image(path).shape

    if img_shape != shape:
        Conf.log.error("{} Image is not {}x{}, got shape: {}".format(path, shape[0], shape[1], img_shape))
        Conf.log.error("You should use one of the rescale options or manually resize the image")

def check_image_file_validity(image_path):
    Check is a file is valid image file.

    :param image_path: <string> Path to the file to check
    :return: <Boolean> True if it's an image file
        im =
    except Exception:
        return False
    return True if os.stat(image_path).st_size != 0 else False

def setup_log(log_lvl=logging.INFO):
    Configure a logger.

    :param log_lvl: <loggin.LVL> level of the log
    :return: <Logger> a logger


    log = logging.getLogger(__name__)


      fmt='[%(levelname)s] %(message)s',
          spam=dict(color='gray', faint=True),
          success=dict(color='green', bold=True),
          critical=dict(color='red', bold=True),

    # Disable this f****** spammer
    pil_logger = logging.getLogger('PIL')

    return log

def camel_case_to_str(identifier):
    Return the string representation of a Camel case word.

    :param identifier: <string> camel case word
    :return: a string representation
    matches = finditer('.+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)', identifier)
    return " ".join([ for m in matches])

def cv2_supported_extension():
    List of extension supported by cv2.

    :return: <string[]> extensions list
    return [".bmp", ".dib", ".jpeg", ".jpg", ".jpe", ".jp2", ".png",
            ".pbm", ".pgm", "ppm", ".sr", ".ras", ".tiff", ".tif",
            ".BMP", ".DIB", ".JPEG", ".JPG", ".JPE", ".JP2", ".PNG",
            ".PBM", ".PGM", "PPM", ".SR", ".RAS", ".TIFF", ".TIF"]

def ffmpeg_supported_extension():
    List of extension supported by ffmpeg.

    :return: <string[]> extensions list
    return [".mp4", ".MP4", ".webm", ".WEBM", ".mov", ".MOV", ".avi", ".AVI", ".mpg", ".MPG", ".mpeg", ".MPEG", ".mkv", ".MKV", ".wmv", ".WMV"]

def load_json(a):
    Load a json form file or string.

    :param a: <string> Path of the file to load or a json string
    :return: <dict> json structure
    if os.path.isfile(a):
        with open(a, 'r') as f:
            j = json.load(f)
        j = json.loads(str(a))
    return j

def json_to_argv(data):
    Json to args parameters.

    :param data: <json>
    :return: <Dict>
    argv = []
    for k, v in data.items():
        if not isinstance(v, bool):
            argv.extend(["--{}".format(k), str(v)])
        elif v:
    return argv

def dl_file(url, file_path):
    Download a file.

    :param url: <string> url of the file to download
    :param file_path: <string> file path where save the file
    :return: <string> full path of downloaded file
    Conf.log.debug("Download url : {} to path: {}".format(url, file_path))
    response = requests.get(url, stream=True)
    dir_path = os.path.dirname(file_path)
    if dir_path != '':
        os.makedirs(dir_path, exist_ok=True)

    with open(file_path, "wb") as f:

        total_length = response.headers.get('content-length')

        if total_length is None:  # no content length header
            dl = 0
            total_length = int(total_length)
            for data in response.iter_content(chunk_size=4096):
                dl += len(data)
                done = int(50 * dl / total_length)
                print("[{}{}]".format('=' * done, ' ' * (50 - done)), end="\r")
            print(" " * 80, end="\r")
    return file_path

def unzip(zip_path, extract_path):
    Extract a zip.

    :param zip_path: <string> path to zip to extract
    :param extract_path: <string> path to dir where to extract
    :return: None
    Conf.log.debug("Extracting zip : {} to path: {}".format(zip_path, extract_path))
    if not os.path.exists(extract_path):
        os.makedirs(extract_path, exist_ok=True)

    with zipfile.ZipFile(zip_path, "r") as zf:
        uncompress_size = sum((file.file_size for file in zf.infolist()))
        extracted_size = 0

        for file in zf.infolist():
            done = int(50 * extracted_size / uncompress_size)
            print("[{}{}]".format('=' * done, ' ' * (50 - done)), end="\r")
            zf.extract(file, path=extract_path)
            extracted_size += file.file_size
        print(" " * 80, end="\r")

def is_a_supported_image_file_extension(path):
    Return true if the file is an image file supported extensions.

    :param path: <sting> path of the file to check
    :return: <boolean> True if the extension is supported
    return os.path.splitext(path)[1] in cv2_supported_extension() + ffmpeg_supported_extension() + [".gif"]

def is_a_supported_video_file_extension(path):
    Return true if the file is an video file supported extensions.

    :param path: <sting> path of the file to check
    :return: <boolean> True if the extension is supported
    return os.path.splitext(path)[1] in ffmpeg_supported_extension()

def is_a_supported_animated_file_extension(path):
    Return true if the file is an video file supported extensions.

    :param path: <sting> path of the file to check
    :return: <boolean> True if the extension is supported
    return os.path.splitext(path)[1] in ffmpeg_supported_extension() + [".gif"]

def check_url(url):
        Check if a url exists withtout downloading it
        :return: <boolean> True if return url exists
    resp = requests.head(url)
    return resp.status_code < 400