tidalf/plugin.audio.qobuz

View on GitHub
resources/lib/qobuz/image.py

Summary

Maintainability
A
0 mins
Test Coverage
import functools
import os
import requests

from qobuz import config
from qobuz.debug import getLogger
from qobuz.util.common import get_default_image_size
from qobuz.util.file import unlink
from qobuz.util.hash import hashit
from qobuz.util.random import randrange

COMBINED_COVER_FMT = 'cover-{nid}-{size_w}-{size_h}-{count}-combine.jpg'
TMPIMG_FMT = 'tmp-img.jpg'
HASHED_ALBUM_COVER_FMT = 'cache-album-cover-{}.jpg'

logger = getLogger(__name__)
PIL_AVAILABLE = False

try:
    from PIL import Image
    PIL_AVAILABLE = True
except ImportError as e:
    logger.error('Cannot import PIL library')


def _mywalk(path):
    for dirpath, _dirnames, filenames in os.walk(path):
        for name in filenames:
            yield name, os.path.join(dirpath, name)


def _find_all_combined_images(covers_path):
    for name, full_path in _mywalk(covers_path):
        if not name.endswith('.jpg'):
            continue
        yield full_path


def cleanfs_combined_covers():
    ''' Clean all images under __covers_path__
    '''
    covers_path = config.path.combined_covers
    for filename in _find_all_combined_images(covers_path):
        unlink(filename)


def next_image_generator_factory(images):
    ''' Yield image path from images array indefinitly (reset to 0)
    '''
    index = 0
    size = len(images)
    while True:
        if index >= size:
            index = 0
        ret = images[index]
        yield ret
        index += 1


def get_remote_image(url):
    ''' get image from url and return path from local file'''
    covers_path = config.path.combined_covers
    hashed_filename = HASHED_ALBUM_COVER_FMT.format(
        hashit(url))
    new_path = os.path.join(covers_path, hashed_filename)

    if not os.path.exists(new_path):
        img_request = requests.get(url, stream=True)
        if img_request.status_code != 200:
            logger.warn('GetRemoteImageError %s', img_request.status_code)
            return None
        with open(new_path, 'wb') as write_handle:
            write_handle.writelines(img_request.iter_content(1024))
            write_handle.flush()
    return new_path


def _resize_image(img_path, thumb_size_w):
    try:
        part = Image.open(img_path)
        new_height = thumb_size_w * part.height / part.width
        return part.resize((thumb_size_w, new_height), Image.ANTIALIAS)
    except Exception as e:
        logger.error('ResizeImageError %s', e)
    return None


def _combine_factory_final_path(count, nid, img_size):
    size_w, size_h = img_size
    covers_path = config.path.combined_covers
    filename = COMBINED_COVER_FMT.format(
        count=count,
        nid=nid,
        size_h=size_h,
        size_w=size_w
    )
    return os.path.join(covers_path, filename)


def _combine_factory_build_one(thumb_size,
                               image_path_generator,
                               new_image,
                               rowcol):
    img_path = next(image_path_generator)
    if img_path.startswith('http'):
        img_path = get_remote_image(img_path)
    image = _resize_image(img_path, thumb_size[0])
    try:
        new_image.paste(image, (rowcol[0] * thumb_size[0],
                            rowcol[1] * thumb_size[1]))
    except:
        logger.warn("new_image paste failed")


def _combine_factory_build(final_path, img_size, count, image_path_generator):
    new_image = Image.new('RGB', img_size)
    demi_count = count / 2
    thumb_size = (
        int(img_size[0] / demi_count),
        int(img_size[1] / demi_count)
    )

    for i in range(0, demi_count):
        for j in range(0, demi_count):
            _combine_factory_build_one(thumb_size,
                                       image_path_generator,
                                       new_image,
                                       (i, j))

    new_image.save(final_path)
    return final_path


def _combine_nopil(_nid, images=None):
    if not images:
        return None
    return images[randrange(0, len(images) - 1)]


def _combine_pil(nid, images=None):
    ''' Combine the first 4 images of images array into single image '''
    count = 4
    img_size = get_default_image_size()
    image_path_generator = next_image_generator_factory(images)
    final_path = _combine_factory_final_path(count, nid, img_size)
    if os.path.exists(final_path):
        return final_path
    return _combine_factory_build(final_path,
                                  img_size,
                                  count,
                                  image_path_generator)


def combine_factory(pil_available, nid, images=None):
    if not config.app.registry.get('image_create_mosaic', to='bool'):
        pil_available = False
    if pil_available is True and len(images) > 1:
        return _combine_pil(nid, images=images)
    return _combine_nopil(nid, images=images)


combine = functools.partial(combine_factory, PIL_AVAILABLE)