BLKSerene/Wordless

View on GitHub
wordless/wl_utils/wl_misc.py

Summary

Maintainability
A
0 mins
Test Coverage
# ----------------------------------------------------------------------
# Wordless: Utilities - Miscellaneous
# Copyright (C) 2018-2024  Ye Lei (叶磊)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# ----------------------------------------------------------------------

import collections
import copy
import os
import platform
import time
import traceback
import urllib

import numpy
import packaging.version
import requests
from PyQt5.QtCore import QCoreApplication
from PyQt5.QtWidgets import QMainWindow

from wordless.wl_utils import wl_paths

_tr = QCoreApplication.translate

def check_os():
    is_windows = False
    is_macos = False
    is_linux = False

    match platform.system():
        case 'Windows':
            is_windows = True
        case 'Darwin':
            is_macos = True
        case 'Linux':
            is_linux = True

    return is_windows, is_macos, is_linux

def get_linux_distro():
    try:
        os_release = platform.freedesktop_os_release()
    # Default to Ubuntu if undetermined
    except OSError:
        os_release = {'ID': 'ubuntu'}

    return os_release['ID']

def change_file_owner_to_user(file_path):
    # pylint: disable=no-member
    _, is_macos, is_linux = check_os()

    # Available on Unix only
    if (is_macos or is_linux) and os.getuid() == 0:
        uid = int(os.environ.get('SUDO_UID'))
        gid = int(os.environ.get('SUDO_GID'))

        os.chown(file_path, uid, gid)

def find_wl_main(widget):
    if 'main' in widget.__dict__:
        main = widget.main
    else:
        main = widget

        while not isinstance(main, QMainWindow):
            main = main.parent()

    return main

def get_wl_ver():
    with open(wl_paths.get_path_file('VERSION'), 'r', encoding = 'utf_8') as f:
        for line in f:
            if line.strip() and not line.startswith('#'):
                wl_ver = line.strip()

                break

    return packaging.version.Version(wl_ver)

REQUESTS_TIMEOUT = 10

def wl_get_proxies(main):
    proxy_settings = main.settings_custom['general']['proxy_settings']

    if proxy_settings['use_proxy']:
        if proxy_settings['username']:
            proxy_username = urllib.parse.quote(proxy_settings['username'])
            proxy_password = urllib.parse.quote(proxy_settings['password'])

            proxy = f"http://{proxy_username}:{proxy_password}@{proxy_settings['address']}:{proxy_settings['port']}"
        else:
            proxy = f"http://{proxy_settings['address']}:{proxy_settings['port']}"

        proxies = {'http': proxy, 'https': proxy}
    else:
        proxies = None

    return proxies

def wl_download(main, url):
    err_msg = ''

    try:
        r = requests.get(url, timeout = REQUESTS_TIMEOUT, proxies = wl_get_proxies(main))

        if r.status_code != 200:
            err_msg = traceback.format_exc()
    except requests.RequestException:
        r = None
        err_msg = traceback.format_exc()

    return r, err_msg

def wl_download_file_size(main, url):
    file_size = 0

    try:
        r = requests.get(url, timeout = REQUESTS_TIMEOUT, stream = True, proxies = wl_get_proxies(main))

        if r.status_code == 200:
            file_size = int(r.headers['content-length'])

        # See: https://requests.readthedocs.io/en/latest/user/advanced/#body-content-workflow
        r.close()
    except requests.RequestException:
        pass

    # In megabytes
    return file_size / 1024 / 1024

def log_time(func):
    def wrapper(widget, *args, **kwargs):
        if isinstance(widget, QMainWindow):
            main = widget
        else:
            main = widget.main

        time_start = time.time()

        return_val = func(widget, *args, **kwargs)

        if return_val != 'skip_logging_time':
            time_elapsed = time.time() - time_start
            time_elapsed_mins = int(time_elapsed // 60)
            time_elapsed_secs = time_elapsed % 60

            msg_min = _tr('wl_misc', 'minute') if time_elapsed_mins == 1 else _tr('wl_misc', 'minutes')
            msg_time = _tr('wl_misc', '(In {} {} {:.2f} seconds)').format(time_elapsed_mins, msg_min, time_elapsed_secs)

            if (msg_cur := main.statusBar().currentMessage()):
                if _tr('wl_misc', '(In') in msg_cur:
                    main.statusBar().showMessage(f"{msg_cur.split(_tr('wl_misc', '(In'))[0].rstrip()} {msg_time}")
                else:
                    main.statusBar().showMessage(f'{msg_cur} {msg_time}')
            else:
                main.statusBar().showMessage(msg_time)

        return return_val

    return wrapper

def flatten_list(list_to_flatten):
    for item in list_to_flatten:
        if isinstance(item, collections.abc.Iterable) and not isinstance(item, (str, bytes)):
            yield from flatten_list(item)
        else:
            yield item

def merge_dicts(dicts_to_merge):
    dict_merged = {}
    len_dicts = len(dicts_to_merge)

    if any(dicts_to_merge):
        for i, dict_to_merge in enumerate(dicts_to_merge):
            if dict_to_merge:
                i_dict = i

                values_2d = isinstance((list(dict_to_merge.values())[0]), list)

                break

        if values_2d:
            defaults_2d = [[0] * len(list(dicts_to_merge[i_dict].values())[0]) for _ in range(len_dicts)]
        else:
            defaults_1d = [0] * len_dicts

        for i, dict_to_merge in enumerate(dicts_to_merge):
            for key, values in dict_to_merge.items():
                if key not in dict_merged:
                    if values_2d:
                        dict_merged[key] = copy.deepcopy(defaults_2d)
                    else:
                        dict_merged[key] = copy.copy(defaults_1d)

                dict_merged[key][i] = values

    return dict_merged

def normalize_nums(nums, normalized_min, normalized_max, reverse = False):
    nums_min = min(nums)
    nums_max = max(nums)

    if nums_min == nums_max:
        nums_normalized = [(normalized_max - normalized_min) / 2] * len(nums)
    else:
        if reverse:
            nums_normalized = [
                numpy.interp(num, [nums_min, nums_max], [normalized_max, normalized_min])
                for num in nums
            ]
        else:
            nums_normalized = [
                numpy.interp(num, [nums_min, nums_max], [normalized_min, normalized_max])
                for num in nums
            ]

    return nums_normalized