CenterForOpenScience/waterbutler

View on GitHub
waterbutler/providers/googlecloud/utils.py

Summary

Maintainability
A
55 mins
Test Coverage
import re
import base64
import typing
import binascii
from urllib.parse import urlparse, quote

from multidict import MultiDict, MultiDictProxy

from waterbutler.core.path import WaterButlerPath
from waterbutler.core.exceptions import WaterButlerError


def get_obj_name(path: WaterButlerPath, is_folder: bool=False) -> str:
    """Get the object name of the file or folder with the given Waterbutler Path.

    .. note::

        Object Name is used by the Google Cloud Storage API (both XML and JSON) in the request path,
        queries and headers to identify the object.  Folders' names always end with a ``'/'`` and
        files' names never do. In addition, neither of them starts with a ``'/'``.

    :param path: the WaterButlerPath of the object
    :type path: :class:`.WaterButlerPath`
    :param bool is_folder: the folder flag
    :rtype: str
    """

    return validate_path_or_name(path.path.lstrip('/'), is_folder=is_folder)


def build_path(obj_name: str, is_folder: bool=False) -> str:
    """Convert the object name to a path string which can pass WaterButler path validation.

    :param str obj_name: the object name of the object
    :param bool is_folder: the folder flag
    :rtype: str
    """

    return validate_path_or_name(
        obj_name if obj_name.startswith('/') else '/{}'.format(obj_name),
        is_folder=is_folder
    )


def validate_path_or_name(path_or_name: str, is_folder: bool=False) -> str:
    """Validate that path or object name.

    :param str path_or_name: the path or the object name
    :param bool is_folder: the folder flag
    :rtype: str
    """

    if is_folder:
        assert path_or_name.endswith('/')
    else:
        assert not path_or_name.endswith('/')

    return path_or_name


def build_url(base: str, *segments, **query) -> str:
    """Build URL with ``'/'`` encoded in path segments and queries for Google Cloud API.

    Objects' names in Google Cloud Storage contain ``'/'`` which must be encoded.  WB calls
    :func:`urllib.parse.quote()` with optional argument ``safe=''``.  The default is ``safe='/'``.

    :param str base: the base URL
    :param segments: the path segments tuple
    :param query: the queries dictionary
    :rtype: str
    """

    parsed_base = urlparse(base).geturl()

    if not segments:
        path = ''
    else:
        path_segments = []
        for segment in segments:
            # Do not strip leading or trailing `/` from segments
            path_segments.append(quote(segment, safe=''))
        path = '/'.join(path_segments)

    if not query:
        queries = ''
    else:
        query_pairs = []
        for key, value in query.items():
            key_value_pair = [quote(key, safe=''), quote(value, safe='')]
            query_pairs.append('='.join(key_value_pair))
        queries = '?' + '&'.join(query_pairs)

    path = '/' + path if path else ''

    return ''.join([parsed_base, path, queries])


def decode_and_hexlify_hashes(hash_str: str) -> typing.Union[str, None]:
    """Decode a Base64-encoded string and return a hexlified string.

     This helper function inputs and outputs string.  However, both :func:`base64.b64decode()` and
     :func:`binascii.hexlify()` operate on bytes.  WB must call ``.encode()`` and ``.decode()`` to
     convert bytes and string back and forth.

    :param str hash_str: the Base64-encoded hash string
    :rtype: str or None
    """

    return binascii.hexlify(base64.b64decode(hash_str.encode())).decode() if hash_str else None


def build_canonical_ext_headers_str(headers: dict) -> str:
    """Build a string for canonical extension headers, which is part of the string to sign.

    .. note::

        Google Cloud Storage has very strict rules for building this string. See:
        https://cloud.google.com/storage/docs/access-control/signed-urls#about-canonical-extension-headers

        For this very limited version of the Google Cloud provider, only
        :meth:`GoogleCloudProvider._intra_copy_file` uses the canonical extension header and it
        uses only one.  There is no need for extra effort to remove ``x-goog-encryption-key`` and
        ``x-goog-encryption-key-sha256`` or to perform a lexicographical sort.

        *TODO [Phase 2]: fully implement this function when needed*

    :param dict headers: the canonical extension headers
    :rtype: str
    """

    # Return ``''`` instead of ``None`` so that it can be properly concatenated
    if not headers:
        return ''

    if len(headers) != 1:
        raise WaterButlerError('The limited provider only supports one canonical extension header.')

    headers_str = ''
    for key, value in headers.items():
        headers_str += '{}:{}\n'.format(key.strip().lower(), value.strip())

    return headers_str


def verify_raw_google_hash_header(google_hash: str) -> bool:
    """Verify the format of the raw value of the "x-goog-hash" header.

    Note: For now this method is used for tests only.

    :param str google_hash: the raw value of the "x-goog-hash" header
    :rtype: bool
    """

    return bool(re.match(r'(crc32c=[A-Za-z0-9+/=]+),(md5=[A-Za-z0-9+/=]+)', google_hash))


def get_multi_dict_from_python_dict(resp_headers_dict: dict) -> MultiDictProxy:
    """Construct an :class:`aiohttp.MultiDictProxy` instance from a Python dictionary.

    Note: For now, this method is used for test only.

    .. note::

        Neither Python dictionary nor JSON supports multi-value key.  The response headers returned
        by `aiohttp` is of immutable type :class:`aiohttp.MultiDictProxy` while the one returned by
        `aiohttpretty` is of :class:`aiohttp.MultiDict`.

        WB tests use the :class:`aiohttp.MultiDict` type for both files and folders during modification
        and returns the :class:`aiohttp.MultiDictProxy` type to imitate the behavior of `aiohttp`.

    :param dict resp_headers_dict: the raw response headers dictionary
    :rtype: :class:`aiohttp.MultiDictProxy`
    """

    resp_headers = MultiDict(resp_headers_dict)
    google_hash = resp_headers.get('x-goog-hash', None)
    if google_hash:
        assert verify_raw_google_hash_header(google_hash)
        resp_headers.pop('x-goog-hash')
        google_hash_list = google_hash.split(',')
        for google_hash in google_hash_list:
            resp_headers.add('x-goog-hash', google_hash)

    return MultiDictProxy(resp_headers)