jakubroztocil/httpie

View on GitHub
httpie/utils.py

Summary

Maintainability
A
25 mins
Test Coverage
import os
import base64
import json
import mimetypes
import re
import sys
import time
import tempfile
import sysconfig

from collections import OrderedDict
from contextlib import contextmanager
from http.cookiejar import parse_ns_headers
from pathlib import Path
from pprint import pformat
from urllib.parse import urlsplit
from typing import Any, List, Optional, Tuple, Generator, Callable, Iterable, IO, TypeVar

import requests.auth

RE_COOKIE_SPLIT = re.compile(r', (?=[^ ;]+=)')
Item = Tuple[str, Any]
Items = List[Item]
T = TypeVar("T")


class JsonDictPreservingDuplicateKeys(OrderedDict):
    """A specialized JSON dict preserving duplicate keys."""

    # Python versions prior to 3.8 suffer from an issue with multiple keys with the same name.
    # `json.dumps(obj, indent=N, sort_keys=True)` will output sorted keys when they are unique, and
    # duplicate keys will be outputted as they were defined in the original data.
    # See <https://bugs.python.org/issue23493#msg400929> for the behavior change between Python versions.
    SUPPORTS_SORTING = sys.version_info >= (3, 8)

    def __init__(self, items: Items):
        self._items = items
        self._ensure_items_used()

    def _ensure_items_used(self) -> None:
        """HACK: Force `json.dumps()` to use `self.items()` instead of an empty dict.

        Two JSON encoders are available on CPython: pure-Python (1) and C (2) implementations.

        (1) The pure-python implementation will do a simple `if not dict: return '{}'`,
        and we could fake that check by implementing the `__bool__()` method.
        Source:
            - <https://github.com/python/cpython/blob/9d318ad/Lib/json/encoder.py#L334-L336>

        (2) On the other hand, the C implementation will do a check on the number of
        items contained inside the dict, using a verification on `dict->ma_used`, which
        is updated only when an item is added/removed from the dict. For that case,
        there is no workaround but to add an item into the dict.
        Sources:
            - <https://github.com/python/cpython/blob/9d318ad/Modules/_json.c#L1581-L1582>
            - <https://github.com/python/cpython/blob/9d318ad/Include/cpython/dictobject.h#L53>
            - <https://github.com/python/cpython/blob/9d318ad/Include/cpython/dictobject.h#L17-L18>

        To please both implementations, we simply add one item to the dict.

        """
        if self._items:
            self['__hack__'] = '__hack__'

    def items(self) -> Items:
        """Return all items, duplicate ones included.

        """
        return self._items


def load_json_preserve_order_and_dupe_keys(s):
    return json.loads(s, object_pairs_hook=JsonDictPreservingDuplicateKeys)


def repr_dict(d: dict) -> str:
    return pformat(d)


def humanize_bytes(n, precision=2):
    # Author: Doug Latornell
    # Licence: MIT
    # URL: https://code.activestate.com/recipes/577081/
    """Return a humanized string representation of a number of bytes.

    >>> humanize_bytes(1)
    '1 B'
    >>> humanize_bytes(1024, precision=1)
    '1.0 kB'
    >>> humanize_bytes(1024 * 123, precision=1)
    '123.0 kB'
    >>> humanize_bytes(1024 * 12342, precision=1)
    '12.1 MB'
    >>> humanize_bytes(1024 * 12342, precision=2)
    '12.05 MB'
    >>> humanize_bytes(1024 * 1234, precision=2)
    '1.21 MB'
    >>> humanize_bytes(1024 * 1234 * 1111, precision=2)
    '1.31 GB'
    >>> humanize_bytes(1024 * 1234 * 1111, precision=1)
    '1.3 GB'

    """
    abbrevs = [
        (1 << 50, 'PB'),
        (1 << 40, 'TB'),
        (1 << 30, 'GB'),
        (1 << 20, 'MB'),
        (1 << 10, 'kB'),
        (1, 'B')
    ]

    if n == 1:
        return '1 B'

    for factor, suffix in abbrevs:
        if n >= factor:
            break

    # noinspection PyUnboundLocalVariable
    return f'{n / factor:.{precision}f} {suffix}'


class ExplicitNullAuth(requests.auth.AuthBase):
    """Forces requests to ignore the ``.netrc``.
    <https://github.com/psf/requests/issues/2773#issuecomment-174312831>
    """

    def __call__(self, r):
        return r


def get_content_type(filename):
    """
    Return the content type for ``filename`` in format appropriate
    for Content-Type headers, or ``None`` if the file type is unknown
    to ``mimetypes``.

    """
    return mimetypes.guess_type(filename, strict=False)[0]


def split_cookies(cookies):
    """
    When ``requests`` stores cookies in ``response.headers['Set-Cookie']``
    it concatenates all of them through ``, ``.

    This function splits cookies apart being careful to not to
    split on ``, `` which may be part of cookie value.
    """
    if not cookies:
        return []
    return RE_COOKIE_SPLIT.split(cookies)


def get_expired_cookies(
    cookies: str,
    now: float = None
) -> List[dict]:

    now = now or time.time()

    def is_expired(expires: Optional[float]) -> bool:
        return expires is not None and expires <= now

    attr_sets: List[Tuple[str, str]] = parse_ns_headers(
        split_cookies(cookies)
    )

    cookies = [
        # The first attr name is the cookie name.
        dict(attrs[1:], name=attrs[0][0])
        for attrs in attr_sets
    ]

    _max_age_to_expires(cookies=cookies, now=now)

    return [
        {
            'name': cookie['name'],
            'path': cookie.get('path', '/')
        }
        for cookie in cookies
        if is_expired(expires=cookie.get('expires'))
    ]


def _max_age_to_expires(cookies, now):
    """
    Translate `max-age` into `expires` for Requests to take it into account.

    HACK/FIXME: <https://github.com/psf/requests/issues/5743>

    """
    for cookie in cookies:
        if 'expires' in cookie:
            continue
        max_age = cookie.get('max-age')
        if max_age and max_age.isdigit():
            cookie['expires'] = now + float(max_age)


def parse_content_type_header(header):
    """Borrowed from requests."""
    tokens = header.split(';')
    content_type, params = tokens[0].strip(), tokens[1:]
    params_dict = {}
    items_to_strip = "\"' "
    for param in params:
        param = param.strip()
        if param:
            key, value = param, True
            index_of_equals = param.find("=")
            if index_of_equals != -1:
                key = param[:index_of_equals].strip(items_to_strip)
                value = param[index_of_equals + 1:].strip(items_to_strip)
            params_dict[key.lower()] = value
    return content_type, params_dict


def as_site(path: Path, **extra_vars) -> Path:
    site_packages_path = sysconfig.get_path(
        'purelib',
        vars={'base': str(path), **extra_vars}
    )
    return Path(site_packages_path)


def get_site_paths(path: Path) -> Iterable[Path]:
    from httpie.compat import (
        MIN_SUPPORTED_PY_VERSION,
        MAX_SUPPORTED_PY_VERSION,
        is_frozen
    )

    if is_frozen:
        [major, min_minor] = MIN_SUPPORTED_PY_VERSION
        [major, max_minor] = MAX_SUPPORTED_PY_VERSION
        for minor in range(min_minor, max_minor + 1):
            yield as_site(
                path,
                py_version_short=f'{major}.{minor}'
            )
    else:
        yield as_site(path)


def split_iterable(iterable: Iterable[T], key: Callable[[T], bool]) -> Tuple[List[T], List[T]]:
    left, right = [], []
    for item in iterable:
        if key(item):
            left.append(item)
        else:
            right.append(item)
    return left, right


def unwrap_context(exc: Exception) -> Optional[Exception]:
    context = exc.__context__
    if isinstance(context, Exception):
        return unwrap_context(context)
    else:
        return exc


def url_as_host(url: str) -> str:
    return urlsplit(url).netloc.split('@')[-1]


class LockFileError(ValueError):
    pass


@contextmanager
def open_with_lockfile(file: Path, *args, **kwargs) -> Generator[IO[Any], None, None]:
    file_id = base64.b64encode(os.fsencode(file)).decode()
    target_file = Path(tempfile.gettempdir()) / file_id

    # Have an atomic-like touch here, so we'll tighten the possibility of
    # a race occurring between multiple processes accessing the same file.
    try:
        target_file.touch(exist_ok=False)
    except FileExistsError as exc:
        raise LockFileError("Can't modify a locked file.") from exc

    try:
        with open(file, *args, **kwargs) as stream:
            yield stream
    finally:
        target_file.unlink()


def is_version_greater(version_1: str, version_2: str) -> bool:
    # In an ideal scenario, we would depend on `packaging` in order
    # to offer PEP 440 compatible parsing. But since it might not be
    # commonly available for outside packages, and since we are only
    # going to parse HTTPie's own version it should be fine to compare
    # this in a SemVer subset fashion.

    def split_version(version: str) -> Tuple[int, ...]:
        parts = []
        for part in version.split('.')[:3]:
            try:
                parts.append(int(part))
            except ValueError:
                break
        return tuple(parts)

    return split_version(version_1) > split_version(version_2)