webcomics/dosage

View on GitHub
dosagelib/util.py

Summary

Maintainability
C
1 day
Test Coverage
D
60%
# SPDX-License-Identifier: MIT
# Copyright (C) 2004-2008 Tristan Seligmann and Jonathan Jacobs
# Copyright (C) 2012-2014 Bastian Kleineidam
# Copyright (C) 2015-2020 Tobias Gruetzmacher
import html
import os
import re
import subprocess
import sys
import time
import traceback

from functools import lru_cache
from urllib.parse import (parse_qs, quote as url_quote, unquote as url_unquote,
        urlparse, urlunparse, urlsplit)
from urllib.robotparser import RobotFileParser

import lxml

from .output import out
from .configuration import UserAgent, App, SupportUrl
from . import AppName

# Maximum content size for HTML pages
MaxContentBytes = 1024 * 1024 * 3  # 3 MB

# The character set to encode non-ASCII characters in a URL. See also
# http://tools.ietf.org/html/rfc2396#section-2.1
# Note that the encoding is not really specified, but most browsers
# encode in UTF-8 when no encoding is specified by the HTTP headers,
# else they use the page encoding for followed link. See als
# http://code.google.com/p/browsersec/wiki/Part1#Unicode_in_URLs
UrlEncoding = "utf-8"


def get_system_uid():
    """Get a (probably) unique ID to identify a system.
    Used to differentiate votes.
    """
    try:
        if os.name == 'nt':
            return get_nt_system_uid()
        if sys.platform == 'darwin':
            return get_osx_system_uid()
    except Exception:
        return get_mac_uid()
    else:
        return get_mac_uid()


def get_nt_system_uid():
    r"""Get the MachineGuid from
    HKEY_LOCAL_MACHINE\Software\Microsoft\Cryptography\MachineGuid
    """
    import winreg
    lm = winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE)
    try:
        key = winreg.OpenKey(lm, r"Software\Microsoft\Cryptography")
        try:
            return winreg.QueryValueEx(key, "MachineGuid")[0]
        finally:
            key.Close()
    finally:
        lm.Close()


def get_osx_system_uid():
    """Get the OSX system ID.
    $ system_profiler |grep "r (system)"
    Serial Number (system): C24E1322XYZ
    """
    res = backtick(["system_profile"]).splitlines()
    for line in res:
        if "r (system)" in line:
            return line.split(':', 1)[1].strip()
    raise ValueError("Could not find system number in %r" % res)


def get_mac_uid():
    """Get the MAC address of the system."""
    import uuid
    return "%d" % uuid.getnode()


def backtick(cmd, encoding='utf-8'):
    """Return decoded output from command."""
    data = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0]
    return data.decode(encoding)


def unicode_safe(text, encoding=UrlEncoding, errors='ignore'):
    """Decode text to Unicode if not already done."""
    if isinstance(text, str):
        return text
    return text.decode(encoding, errors)


def tagre(tag, attribute, value, quote='"', before="", after=""):
    """Return a regular expression matching the given HTML tag, attribute
    and value. It matches the tag and attribute names case insensitive,
    and skips arbitrary whitespace and leading HTML attributes. The "<>" at
    the start and end of the HTML tag is also matched.
    @param tag: the tag name
    @ptype tag: string
    @param attribute: the attribute name
    @ptype attribute: string
    @param value: the attribute value
    @ptype value: string
    @param quote: the attribute quote (default ")
    @ptype quote: string
    @param after: match after attribute value but before end
    @ptype after: string

    @return: the generated regular expression suitable for re.compile()
    @rtype: string
    """
    if before:
        prefix = r"[^>]*%s[^>]*\s+" % before
    else:
        prefix = r"(?:[^>]*\s+)?"
    attrs = dict(
        tag=case_insensitive_re(tag),
        attribute=case_insensitive_re(attribute),
        value=value,
        quote=quote,
        prefix=prefix,
        after=after,
    )
    return (r'<\s*%(tag)s\s+%(prefix)s' +
            r'%(attribute)s\s*=\s*%(quote)s%(value)s%(quote)' +
            r's[^>]*%(after)s[^>]*>') % attrs


def case_insensitive_re(name):
    """Reformat the given name to a case insensitive regular expression string
    without using re.IGNORECASE. This way selective strings can be made case
    insensitive.
    @param name: the name to make case insensitive
    @ptype name: string
    @return: the case insensitive regex
    @rtype: string
    """
    return "".join("[%s%s]" % (c.lower(), c.upper()) for c in name)


def get_page(url, session, **kwargs):
    """Get text content of given URL."""
    check_robotstxt(url, session)
    # read page data
    page = urlopen(url, session, max_content_bytes=MaxContentBytes, **kwargs)
    out.debug(u"Got page content %r" % page.content, level=3)
    return page


def makeSequence(item):
    """If item is already a list or tuple, return it.
    Else return a tuple with item as single element."""
    if isinstance(item, (list, tuple)):
        return item
    return (item,)


def prettyMatcherList(things):
    """Try to construct a nicely-formatted string for a list of matcher
    objects. Those may be compiled regular expressions or strings..."""
    norm = []
    for x in makeSequence(things):
        if hasattr(x, 'pattern'):
            norm.append(x.pattern)
        else:
            norm.append(x)
    return "('%s')" % "', '".join(norm)


def normaliseURL(url):
    """Normalising
    - strips and leading or trailing whitespace,
    - replaces HTML entities and character references,
    - removes any leading empty segments to avoid breaking urllib2.
    """
    url = unicode_safe(url).strip()
    # XXX: brutal hack
    url = html.unescape(url)

    pu = list(urlparse(url))
    segments = pu[2].split('/')
    while segments and segments[0] in ('', '..'):
        del segments[0]
    pu[2] = '/' + '/'.join(segments)
    # remove leading '&' from query
    if pu[4].startswith('&'):
        pu[4] = pu[4][1:]
    # remove anchor
    pu[5] = ""
    return urlunparse(pu)


def get_roboturl(url):
    """Get robots.txt URL from given URL."""
    pu = urlparse(url)
    return urlunparse((pu[0], pu[1], "/robots.txt", "", "", ""))


def check_robotstxt(url, session):
    """Check if robots.txt allows our user agent for the given URL.
    @raises: IOError if URL is not allowed
    """
    roboturl = get_roboturl(url)
    rp = get_robotstxt_parser(roboturl, session=session)
    if not rp.can_fetch(UserAgent, str(url)):
        raise IOError("%s is disallowed by %s" % (url, roboturl))


@lru_cache()
def get_robotstxt_parser(url, session=None):
    """Get a RobotFileParser for the given robots.txt URL."""
    rp = RobotFileParser()
    try:
        req = urlopen(url, session, max_content_bytes=MaxContentBytes,
                      allow_errors=range(600))
    except Exception:
        # connect or timeout errors are treated as an absent robots.txt
        rp.allow_all = True
    else:
        if req.status_code >= 400:
            rp.allow_all = True
        elif req.status_code == 200:
            rp.parse(req.text.splitlines())
    return rp


def urlopen(url, session, referrer=None, max_content_bytes=None,
            allow_errors=(), **kwargs):
    """Open an URL and return the response object."""
    out.debug(u'Open URL %s' % url)
    if 'headers' not in kwargs:
        kwargs['headers'] = {}
    if referrer:
        kwargs['headers']['Referer'] = referrer
    out.debug(u'Sending headers %s' % kwargs['headers'], level=3)
    out.debug(u'Sending cookies %s' % session.cookies)
    if 'data' not in kwargs:
        method = 'GET'
    else:
        method = 'POST'
        out.debug(u'Sending POST data %s' % kwargs['data'], level=3)
    req = session.request(method, url, **kwargs)
    out.debug(u'Response cookies: %s' % req.cookies)
    check_content_size(url, req.headers, max_content_bytes)
    if req.status_code not in allow_errors:
        req.raise_for_status()
    return req


def check_content_size(url, headers, max_content_bytes):
    """Check that content length in URL response headers do not exceed the
    given maximum bytes.
    """
    if not max_content_bytes:
        return
    if 'content-length' in headers:
        size = int(headers['content-length'])
        if size > max_content_bytes:
            raise IOError(
                'URL content of %s with %d bytes exceeds %d bytes.' %
                (url, size, max_content_bytes))


def splitpath(path):
    """Split a path in its components."""
    c = []
    head, tail = os.path.split(path)
    while tail:
        c.insert(0, tail)
        head, tail = os.path.split(head)
    return c


def getRelativePath(basepath, path):
    """Get a path that is relative to the given base path."""
    basepath = splitpath(os.path.abspath(basepath))
    path = splitpath(os.path.abspath(path))
    afterCommon = False
    for c in basepath:
        if afterCommon or path[0] != c:
            path.insert(0, os.path.pardir)
            afterCommon = True
        else:
            del path[0]
    return os.path.join(*path)


def getQueryParams(url):
    """Get URL query parameters."""
    query = urlsplit(url).query
    out.debug(u'Extracting query parameters from %r (%r)...' % (url, query))
    return parse_qs(query)


def internal_error(out=sys.stderr, etype=None, evalue=None, tb=None):
    """Print internal error message (output defaults to stderr)."""
    print(os.linesep, file=out)
    print("""********** Oops, I did it again. *************

You have found an internal error in %(app)s. Please write a bug report
at %(url)s and include at least the information below:

Not disclosing some of the information below due to privacy reasons is ok.
I will try to help you nonetheless, but you have to give me something
I can work with ;) .
""" % dict(app=AppName, url=SupportUrl), file=out)
    if etype is None:
        etype = sys.exc_info()[0]
    if evalue is None:
        evalue = sys.exc_info()[1]
    print(etype, evalue, file=out)
    if tb is None:
        tb = sys.exc_info()[2]
    traceback.print_exception(etype, evalue, tb, None, out)
    print_app_info(out=out)
    print_proxy_info(out=out)
    print_locale_info(out=out)
    print(os.linesep,
          "******** %s internal error, over and out ********" % AppName,
          file=out)


def print_env_info(key, out=sys.stderr):
    """If given environment key is defined, print it out."""
    value = os.getenv(key)
    if value is not None:
        print(key, "=", repr(value), file=out)


def print_proxy_info(out=sys.stderr):
    """Print proxy info."""
    print_env_info("http_proxy", out=out)


def print_locale_info(out=sys.stderr):
    """Print locale info."""
    for key in ("LANGUAGE", "LC_ALL", "LC_CTYPE", "LANG"):
        print_env_info(key, out=out)


def print_app_info(out=sys.stderr):
    """Print system and application info (output defaults to stderr)."""
    print("System info:", file=out)
    print(App, file=out)
    print("Python %(version)s on %(platform)s" %
          {"version": sys.version, "platform": sys.platform}, file=out)
    print("libxml2 version: %i.%i.%i" % lxml.etree.LIBXML_VERSION, file=out)
    stime = strtime(time.time())
    print("Local time:", stime, file=out)
    print("sys.argv", sys.argv, file=out)


def strtime(t):
    """Return ISO 8601 formatted time."""
    return (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(t)) +
            strtimezone())


def strtimezone():
    """Return timezone info, %z on some platforms, but not supported on all.
    """
    if time.daylight:
        zone = time.altzone
    else:
        zone = time.timezone
    return "%+04d" % (-zone // 3600)


def rfc822date(indate):
    """Format date in rfc822 format."""
    return time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(indate))


def unquote(text):
    """Replace all percent-encoded entities in text."""
    while '%' in text:
        newtext = url_unquote(text)
        if newtext == text:
            break
        text = newtext
    return text


def quote(text, safechars='/'):
    """Percent-encode given text."""
    return url_quote(text, safechars)


def strsize(b):
    """Return human representation of bytes b. A negative number of bytes
    raises a value error."""
    if b < 0:
        raise ValueError("Invalid negative byte number")
    if b < 1024:
        return "%dB" % b
    if b < 1024 * 10:
        return "%dKB" % (b // 1024)
    if b < 1024 * 1024:
        return "%.2fKB" % (float(b) / 1024)
    if b < 1024 * 1024 * 10:
        return "%.2fMB" % (float(b) / (1024 * 1024))
    if b < 1024 * 1024 * 1024:
        return "%.1fMB" % (float(b) / (1024 * 1024))
    if b < 1024 * 1024 * 1024 * 10:
        return "%.2fGB" % (float(b) / (1024 * 1024 * 1024))
    return "%.1fGB" % (float(b) / (1024 * 1024 * 1024))


def getFilename(name):
    """Get a filename from given name without dangerous or incompatible
    characters."""
    # first replace all illegal chars
    name = re.sub(r"[^0-9a-zA-Z_\-\.]", "_", name)
    # then remove double dots and underscores
    while ".." in name:
        name = name.replace('..', '.')
    while "__" in name:
        name = name.replace('__', '_')
    # remove a leading dot or minus
    if name.startswith((".", "-")):
        name = name[1:]
    return name


def getExistingFile(name, max_suffix=1000):
    """Add filename suffix until file exists
    @return: filename if file is found
    @raise: ValueError if maximum suffix number is reached while searching
    """
    num = 1
    stem, ext = os.path.splitext(name)
    filename = name
    while not os.path.exists(filename):
        suffix = "-%d" % num
        filename = stem + suffix + ext
        num += 1
        if num >= max_suffix:
            raise ValueError("No file %r found" % name)
    return filename


def getNonexistingFile(name):
    """Add filename suffix until file not exists
    @return: filename
    """
    num = 1
    stem, ext = os.path.splitext(name)
    filename = name
    while os.path.exists(filename):
        suffix = "-%d" % num
        filename = stem + suffix + ext
        num += 1
    return filename


def strlimit(s, length=72):
    """If the length of the string exceeds the given limit, it will be cut
    off and three dots will be appended.

    @param s: the string to limit
    @type s: string
    @param length: maximum length
    @type length: non-negative integer
    @return: limited string, at most length+3 characters long
    """
    assert length >= 0, "length limit must be a non-negative integer"
    if not s or len(s) <= length:
        return s
    if length == 0:
        return ""
    return "%s..." % s[:length]


def uniq(input):
    """Remove duplicates from a list while preserving the list order"""
    output = []
    for item in input:
        if item not in output:
            output.append(item)
    return output