CastagnaIT/plugin.video.netflix

View on GitHub
resources/lib/common/misc_utils.py

Summary

Maintainability
A
0 mins
Test Coverage
# -*- coding: utf-8 -*-
"""
    Copyright (C) 2017 Sebastian Golasch (plugin.video.netflix)
    Copyright (C) 2018 Caphm (original implementation module)
    Miscellaneous utility functions

    SPDX-License-Identifier: MIT
    See LICENSES/MIT.md for more information.
"""
import operator
from urllib.parse import quote, urlencode

from resources.lib.globals import G


def find(value_to_find, attribute, search_space):
    """Find a video with matching id in a dict or list"""
    for video in search_space:
        if video[attribute] == value_to_find:
            return video
    raise KeyError(f'Metadata for {value_to_find} does not exist')


def find_episode_metadata(episode_videoid, metadata):
    """Find metadata for a specific episode within a show metadata dict"""
    season = find(int(episode_videoid.seasonid), 'id', metadata['seasons'])
    episode = find(int(episode_videoid.episodeid), 'id', season.get('episodes', {}))
    return episode, season


def get_class_methods(class_item=None):
    """
    Returns the class methods of agiven class object

    :param class_item: Class item to introspect
    :type class_item: object
    :returns: list -- Class methods
    """
    from types import FunctionType
    _type = FunctionType
    return [x for x, y in class_item.__dict__.items()
            if isinstance(y, _type)]


def build_url(pathitems=None, videoid=None, params=None, mode=None):
    """Build a plugin URL from pathitems and query parameters. Add videoid to the path if it's present."""
    if not (pathitems or videoid):
        raise ValueError('Either pathitems or videoid must be set.')
    path = f'{G.BASE_URL}/{_encode_path(mode, pathitems, videoid)}/{_encode_params(params)}'
    return path


def _expand_mode(mode):
    return [mode] if mode else []


def _expand_videoid(videoid):
    return videoid.to_path() if videoid else []


def _encode_path(mode, pathitems, videoid):
    return quote(
        '/'.join(_expand_mode(mode) +
                 (pathitems or []) +
                 _expand_videoid(videoid)).encode('utf-8'))


def _encode_params(params):
    return f'?{urlencode(params)}' if params else ''


def is_numeric(string):
    """Return true if string represents an integer, else false"""
    try:
        int(string)
    except ValueError:
        return False
    return True


def strp(value, form):
    """
    Helper function to safely create datetime objects from strings

    :return: datetime - parsed datetime object
    """
    # pylint: disable=broad-except
    from datetime import datetime
    def_value = datetime.utcfromtimestamp(0)
    try:
        return datetime.strptime(value, form)
    except TypeError:
        # Python bug https://bugs.python.org/issue27400
        try:
            from time import strptime
            return datetime(*(strptime(value, form)[0:6]))
        except ValueError:
            return def_value
    except Exception:
        return def_value


def strf_timestamp(timestamp, form):
    """
    Helper function to safely create string date time from a timestamp value

    :return: string - date time in the specified form
    """
    from datetime import datetime
    try:
        return datetime.utcfromtimestamp(timestamp).strftime(form)
    except Exception:  # pylint: disable=broad-except
        return ''


# def compress_data(data):
#    """GZIP and b64 encode data"""
#    out = StringIO()
#    with gzip.GzipFile(fileobj=out, mode='w') as outh:
#        outh.write(data)
#    return base64.standard_b64encode(out.getvalue())


def merge_dicts(dict_to_merge, merged_dict):
    """Recursively merge the contents of dict_to_merge into merged_dict.
    Values that are already present in merged_dict will be overwritten if they are also present in dict_to_merge"""
    for key, value in dict_to_merge.items():
        if isinstance(merged_dict.get(key), dict):
            merge_dicts(value, merged_dict[key])
        else:
            merged_dict[key] = value
    return merged_dict


def compare_dict_keys(dict_a, dict_b, compare_keys):
    """Compare two dictionaries with the specified keys"""
    return all(dict_a[k] == dict_b[k] for k in dict_a if k in compare_keys)


def chunked_list(seq, chunk_len):
    for start in range(0, len(seq), chunk_len):
        yield seq[start:start + chunk_len]


def any_value_except(mapping, excluded_keys):
    """Return a random value from a dict that is not associated with excluded_key.
    Raises StopIteration if there are no other keys than excluded_key"""
    return next(mapping[key] for key in mapping if key not in excluded_keys)


def enclose_quotes(content):
    return f'"{content}"'


def make_list(arg):
    """Return a list with arg as its member or arg if arg is already a list. Returns an empty list if arg is None"""
    return (arg
            if isinstance(arg, list)
            else ([arg]
                  if arg is not None
                  else []))


def convert_seconds_to_hms_str(time):
    h = int(time // 3600)
    time %= 3600
    m = int(time // 60)
    s = int(time % 60)
    return f'{h:02d}:{m:02d}:{s:02d}'


def remove_html_tags(raw_html):
    import re
    pattern = re.compile('<.*?>')
    return re.sub(pattern, '', raw_html)


def censure(value, length=3):
    """Censor part of the string with asterisks"""
    if not value:
        return value
    return value[:-length] + '*' * length


def run_threaded(non_blocking, target_func, *args, **kwargs):
    """Call a function in a thread, when specified"""
    if not non_blocking:
        return target_func(*args, **kwargs)
    from threading import Thread
    Thread(target=target_func, args=args, kwargs=kwargs).start()
    return None


class CmpVersion:
    """Comparator for version numbers"""
    def __init__(self, version):
        self.__version = str(version or '')
        self.__ver_list = (self.__version or '0').split('.')

    def __str__(self):
        return self.__version

    def __repr__(self):
        return self.__version

    def __bool__(self):
        """
        Allow "if" operator to check if there is a version set.
        Will return False only when "version" set is an empty string or None.
        """
        return bool(self.__version)

    def __iter__(self):
        """Allow to get the version list by using "list" command builtin"""
        return iter(self.__ver_list)

    def __lt__(self, other):
        """Operator <"""
        return operator.lt(*zip(*map(lambda x, y: (x or 0, y or 0),
                                     map(int, self.__ver_list),
                                     map(int, self.__conv_to_list(other)))))

    def __le__(self, other):
        """Operator <="""
        return operator.le(*zip(*map(lambda x, y: (x or 0, y or 0),
                                     map(int, self.__ver_list),
                                     map(int, self.__conv_to_list(other)))))

    def __gt__(self, other):
        """Operator >"""
        return operator.gt(*zip(*map(lambda x, y: (x or 0, y or 0),
                                     map(int, self.__ver_list),
                                     map(int, self.__conv_to_list(other)))))

    def __ge__(self, other):
        """Operator >="""
        return operator.ge(*zip(*map(lambda x, y: (x or 0, y or 0),
                                     map(int, self.__ver_list),
                                     map(int, self.__conv_to_list(other)))))

    def __eq__(self, other):
        """Operator =="""
        return operator.eq(*zip(*map(lambda x, y: (x or 0, y or 0),
                                     map(int, self.__ver_list),
                                     map(int, self.__conv_to_list(other)))))

    def __ne__(self, other):
        """Operator !="""
        return operator.ne(*zip(*map(lambda x, y: (x or 0, y or 0),
                                     map(int, self.__ver_list),
                                     map(int, self.__conv_to_list(other)))))

    def __conv_to_list(self, value):
        """Convert a string or number or CmpVersion object to a list of strings"""
        if isinstance(value, CmpVersion):
            return list(value)
        return str(value or '0').split('.')