jakubroztocil/httpie

View on GitHub
httpie/cli/nested_json/interpret.py

Summary

Maintainability
B
4 hrs
Test Coverage
from typing import Type, Union, Any, Iterable, Tuple

from .parse import parse, assert_cant_happen
from .errors import NestedJSONSyntaxError
from .tokens import EMPTY_STRING, TokenKind, Token, PathAction, Path, NestedJSONArray


__all__ = [
    'interpret_nested_json',
    'unwrap_top_level_list_if_needed',
]

JSONType = Type[Union[dict, list, int, float, str]]
JSON_TYPE_MAPPING = {
    dict: 'object',
    list: 'array',
    int: 'number',
    float: 'number',
    str: 'string',
}


def interpret_nested_json(pairs: Iterable[Tuple[str, str]]) -> dict:
    context = None
    for key, value in pairs:
        context = interpret(context, key, value)
    return wrap_with_dict(context)


def interpret(context: Any, key: str, value: Any) -> Any:
    cursor = context
    paths = list(parse(key))
    paths.append(Path(PathAction.SET, value))

    # noinspection PyShadowingNames
    def type_check(index: int, path: Path, expected_type: JSONType):
        if not isinstance(cursor, expected_type):
            if path.tokens:
                pseudo_token = Token(
                    kind=TokenKind.PSEUDO,
                    value='',
                    start=path.tokens[0].start,
                    end=path.tokens[-1].end,
                )
            else:
                pseudo_token = None
            cursor_type = JSON_TYPE_MAPPING.get(type(cursor), type(cursor).__name__)
            required_type = JSON_TYPE_MAPPING[expected_type]
            message = f'Cannot perform {path.kind.to_string()!r} based access on '
            message += repr(''.join(path.reconstruct() for path in paths[:index]))
            message += f' which has a type of {cursor_type!r} but this operation'
            message += f' requires a type of {required_type!r}.'
            raise NestedJSONSyntaxError(
                source=key,
                token=pseudo_token,
                message=message,
                message_kind='Type',
            )

    def object_for(kind: PathAction) -> Any:
        if kind is PathAction.KEY:
            return {}
        elif kind in {PathAction.INDEX, PathAction.APPEND}:
            return []
        else:
            assert_cant_happen()

    for index, (path, next_path) in enumerate(zip(paths, paths[1:])):
        # If there is no context yet, set it.
        if cursor is None:
            context = cursor = object_for(path.kind)
        if path.kind is PathAction.KEY:
            type_check(index, path, dict)
            if next_path.kind is PathAction.SET:
                cursor[path.accessor] = next_path.accessor
                break
            cursor = cursor.setdefault(path.accessor, object_for(next_path.kind))
        elif path.kind is PathAction.INDEX:
            type_check(index, path, list)
            if path.accessor < 0:
                raise NestedJSONSyntaxError(
                    source=key,
                    token=path.tokens[1],
                    message='Negative indexes are not supported.',
                    message_kind='Value',
                )
            cursor.extend([None] * (path.accessor - len(cursor) + 1))
            if next_path.kind is PathAction.SET:
                cursor[path.accessor] = next_path.accessor
                break
            if cursor[path.accessor] is None:
                cursor[path.accessor] = object_for(next_path.kind)
            cursor = cursor[path.accessor]
        elif path.kind is PathAction.APPEND:
            type_check(index, path, list)
            if next_path.kind is PathAction.SET:
                cursor.append(next_path.accessor)
                break
            cursor.append(object_for(next_path.kind))
            cursor = cursor[-1]
        else:
            assert_cant_happen()

    return context


def wrap_with_dict(context):
    if context is None:
        return {}
    elif isinstance(context, list):
        return {
            EMPTY_STRING: NestedJSONArray(context),
        }
    else:
        assert isinstance(context, dict)
        return context


def unwrap_top_level_list_if_needed(data: dict):
    """
    Propagate the top-level list, if that’s what we got.

    """
    if len(data) == 1:
        key, value = list(data.items())[0]
        if isinstance(value, NestedJSONArray):
            assert key == EMPTY_STRING
            return value
    return data