jacquev6/LowVoltage

View on GitHub
LowVoltage/actions/query.py

Summary

Maintainability
F
1 wk
Test Coverage
# coding: utf8

# Copyright 2014-2015 Vincent Jacques <vincent@vincent-jacques.net>

"""
When given a :class:`Query`, the connection will return a :class:`QueryResponse`:

>>> connection(Query(table2).key_eq("h", 42))
<LowVoltage.actions.query.QueryResponse ...>

Items are accessed like this:

>>> connection(Query(table2).key_eq("h", 42)).items
[{u'h': 42, u'r1': 0, u'r2': 10}, {u'h': 42, u'r1': 1, u'r2': 9}, {u'h': 42, u'r1': 2, u'r2': 8}, {u'h': 42, u'r1': 3, u'r2': 7}, {u'h': 42, u'r1': 4, u'r2': 6}, {u'h': 42, u'r1': 5, u'r2': 5}, {u'h': 42, u'r1': 6}, {u'h': 42, u'r1': 7}, {u'h': 42, u'r1': 8}, {u'h': 42, u'r1': 9}]

See also the :func:`.iterate_query` compound. And :ref:`actions-vs-compounds` in the user guide.
"""

import LowVoltage as _lv
import LowVoltage.testing as _tst
from .action import Action
from .conversion import _convert_value_to_db, _convert_db_to_dict
from .next_gen_mixins import proxy
from .next_gen_mixins import OptionalBoolParameter, OptionalDictParameter
from .next_gen_mixins import (
    ConsistentRead,
    ExclusiveStartKey,
    ExpressionAttributeNames,
    ExpressionAttributeValues,
    FilterExpression,
    IndexName,
    Limit,
    ProjectionExpression,
    ReturnConsumedCapacity,
    Select,
    TableName,
)
from .return_types import ConsumedCapacity, _is_dict, _is_int, _is_list_of_dict


class QueryResponse(object):
    """
    QueryResponse()

    The `Query response <http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Query.html#API_Query_ResponseElements>`__.
    """

    def __init__(
        self,
        ConsumedCapacity=None,
        Count=None,
        Items=None,
        LastEvaluatedKey=None,
        ScannedCount=None,
        **dummy
    ):
        self.__consumed_capacity = ConsumedCapacity
        self.__count = Count
        self.__items = Items
        self.__last_evaluated_key = LastEvaluatedKey
        self.__scanned_count = ScannedCount

    @property
    def consumed_capacity(self):
        """
        The capacity consumed by the request. If you used :meth:`~Query.return_consumed_capacity_total` or :meth:`~Query.return_consumed_capacity_indexes`.

        :type: ``None`` or :class:`.ConsumedCapacity`
        """
        if _is_dict(self.__consumed_capacity):
            return ConsumedCapacity(**self.__consumed_capacity)

    @property
    def count(self):
        """
        The number of items matching the query.

        :type: ``None`` or long
        """
        if _is_int(self.__count):
            return long(self.__count)

    @property
    def items(self):
        """
        The items matching the query. Unless you used :meth:`~Query.select_count`.

        :type: ``None`` or list of dict
        """
        if _is_list_of_dict(self.__items):
            return [_convert_db_to_dict(i) for i in self.__items]

    @property
    def last_evaluated_key(self):
        """
        The key of the last item evaluated by the query. If not None, it should be given to :meth:`~Query.exclusive_start_key` is a subsequent :class:`Query`.

        The :func:`.iterate_query` compound does that for you.

        :type: ``None`` or dict
        """
        if _is_dict(self.__last_evaluated_key):
            return _convert_db_to_dict(self.__last_evaluated_key)

    @property
    def scanned_count(self):
        """
        The number of item scanned during the query. This can be different from :attr:`count` when using :meth:`~Query.filter_expression`.

        :type: ``None`` or long
        """
        if _is_int(self.__scanned_count):
            return long(self.__scanned_count)


class KeyConditions(OptionalDictParameter):
    def __init__(self, parent):
        super(KeyConditions, self).__init__("KeyConditions", parent)

    def _convert(self, (operator, values)):
        return {
            "ComparisonOperator": operator,
            "AttributeValueList": [_convert_value_to_db(value) for value in values]
        }


class Query(Action):
    """
    The `Query request <http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Query.html#API_Query_RequestParameters>`__.
    """

    def __init__(self, table_name=None):
        """
        Passing ``table_name`` to the constructor is like calling :meth:`table_name` on the new instance.
        """
        super(Query, self).__init__("Query", QueryResponse)
        self.__consistent_read = ConsistentRead(self)
        self.__exclusive_start_key = ExclusiveStartKey(self)
        self.__expression_attribute_names = ExpressionAttributeNames(self)
        self.__expression_attribute_values = ExpressionAttributeValues(self)
        self.__filter_expression = FilterExpression(self)
        self.__index_name = IndexName(self)
        self.__key_conditions = KeyConditions(self)
        self.__limit = Limit(self)
        self.__projection_expression = ProjectionExpression(self)
        self.__return_consumed_capacity = ReturnConsumedCapacity(self)
        self.__scan_index_forward = OptionalBoolParameter("ScanIndexForward", self)
        self.__select = Select(self)
        self.__table_name = TableName(self, table_name)

    @property
    def payload(self):
        data = {}
        data.update(self.__consistent_read.payload)
        data.update(self.__exclusive_start_key.payload)
        data.update(self.__expression_attribute_names.payload)
        data.update(self.__expression_attribute_values.payload)
        data.update(self.__filter_expression.payload)
        data.update(self.__index_name.payload)
        data.update(self.__key_conditions.payload)
        data.update(self.__limit.payload)
        data.update(self.__projection_expression.payload)
        data.update(self.__return_consumed_capacity.payload)
        data.update(self.__scan_index_forward.payload)
        data.update(self.__select.payload)
        data.update(self.__table_name.payload)
        return data

    @proxy
    def table_name(self, table_name):
        """
        >>> connection(
        ...   Query()
        ...     .table_name(table2)
        ...     .key_eq("h", 42)
        ... )
        <LowVoltage.actions.query.QueryResponse ...>
        """
        return self.__table_name.set(table_name)

    def key_eq(self, name, value):
        """
        Add a EQ condition to KeyConditions. Usable on both the hash key and the range key.
        The response will contain items whose key attribute ``name`` is equal to ``value``.

        >>> connection(
        ...   Query(table2)
        ...     .key_eq("h", 42)
        ... ).items
        [{u'h': 42, u'r1': 0, u'r2': 10}, {u'h': 42, u'r1': 1, u'r2': 9}, {u'h': 42, u'r1': 2, u'r2': 8}, ...]
        """
        return self.__key_conditions.add(name, ("EQ", [value]))

    def key_le(self, name, value):
        """
        Add a LE condition to KeyConditions. Usable only on the range key.
        The response will contain items whose key attribute ``name`` is less than or equal to ``value``.

        >>> connection(
        ...   Query(table2)
        ...     .key_eq("h", 42)
        ...     .key_le("r1", 1)
        ... ).items
        [{u'h': 42, u'r1': 0, u'r2': 10}, {u'h': 42, u'r1': 1, u'r2': 9}]
        """
        return self.__key_conditions.add(name, ("LE", [value]))

    def key_lt(self, name, value):
        """
        Add a LT condition to KeyConditions. Usable only on the range key.
        The response will contain items whose key attribute ``name`` is strictly less than ``value``.

        >>> connection(
        ...   Query(table2)
        ...     .key_eq("h", 42)
        ...     .key_lt("r1", 2)
        ... ).items
        [{u'h': 42, u'r1': 0, u'r2': 10}, {u'h': 42, u'r1': 1, u'r2': 9}]
        """
        return self.__key_conditions.add(name, ("LT", [value]))

    def key_ge(self, name, value):
        """
        Add a GE condition to KeyConditions. Usable only on the range key.
        The response will contain items whose key attribute ``name`` is greater than or equal to ``value``.

        >>> connection(
        ...   Query(table2)
        ...     .key_eq("h", 42)
        ...     .key_ge("r1", 7)
        ... ).items
        [{u'h': 42, u'r1': 7}, {u'h': 42, u'r1': 8}, {u'h': 42, u'r1': 9}]
        """
        return self.__key_conditions.add(name, ("GE", [value]))

    def key_gt(self, name, value):
        """
        Add a GT condition to KeyConditions. Usable only on the range key.
        The response will contain items whose key attribute ``name`` is strictly greater than ``value``.

        >>> connection(
        ...   Query(table2)
        ...     .key_eq("h", 42)
        ...     .key_gt("r1", 6)
        ... ).items
        [{u'h': 42, u'r1': 7}, {u'h': 42, u'r1': 8}, {u'h': 42, u'r1': 9}]
        """
        return self.__key_conditions.add(name, ("GT", [value]))

    def key_begins_with(self, name, value):
        """
        Add a BEGINS_WITH condition to KeyConditions. Usable only on the range key if it is a string.
        The response will contain items whose key attribute ``name`` begins with ``value``.
        """
        return self.__key_conditions.add(name, ("BEGINS_WITH", [value]))

    def key_between(self, name, lo, hi):
        """
        Add a BETWEEN condition to KeyConditions. Usable only on the range key.
        The response will contain items whose key attribute ``name`` is greater than or equal to ``lo`` and less than or equal to ``hi``.

        >>> connection(
        ...   Query(table2)
        ...     .key_eq("h", 42)
        ...     .key_between("r1", 4, 6)
        ... ).items
        [{u'h': 42, u'r1': 4, u'r2': 6}, {u'h': 42, u'r1': 5, u'r2': 5}, {u'h': 42, u'r1': 6}]
        """
        return self.__key_conditions.add(name, ("BETWEEN", [lo, hi]))

    @proxy("Query")
    def exclusive_start_key(self, key):
        """
        The :func:`.iterate_query` compound does that for you.

        >>> r = connection(
        ...   Query(table2)
        ...     .key_eq("h", 42)
        ...     .limit(2)
        ... )
        >>> r.items
        [{u'h': 42, u'r1': 0, u'r2': 10}, {u'h': 42, u'r1': 1, u'r2': 9}]
        >>> r.last_evaluated_key
        {u'h': 42, u'r1': 1}
        >>> connection(
        ...   Query(table2)
        ...     .key_eq("h", 42)
        ...     .limit(2)
        ...     .exclusive_start_key({u'h': 42, u'r1': 1})
        ... ).items
        [{u'h': 42, u'r1': 2, u'r2': 8}, {u'h': 42, u'r1': 3, u'r2': 7}]
        """
        return self.__exclusive_start_key.set(key)

    @proxy
    def limit(self, limit):
        """
        See :meth:`exclusive_start_key` for an example.
        """
        return self.__limit.set(limit)

    @proxy
    def select_count(self):
        """
        >>> r = connection(
        ...   Query(table2)
        ...     .key_eq("h", 42)
        ...     .select_count()
        ... )
        >>> r.count
        10L
        >>> print r.items
        None
        """
        return self.__select.count()

    @proxy
    def select_all_attributes(self):
        """
        >>> connection(
        ...   Query(table2)
        ...     .key_eq("h", 42)
        ...     .index_name("lsi")
        ...     .select_all_attributes()
        ...     .limit(2)
        ... ).items
        [{u'h': 42, u'r1': 5, u'r2': 5}, {u'h': 42, u'r1': 4, u'r2': 6}]
        """
        return self.__select.all_attributes()

    @proxy
    def select_all_projected_attributes(self):
        """
        >>> connection(
        ...   Query(table2)
        ...     .key_eq("h", 42)
        ...     .index_name("lsi")
        ...     .select_all_projected_attributes()
        ...     .limit(2)
        ... ).items
        [{u'h': 42, u'r1': 5, u'r2': 5}, {u'h': 42, u'r1': 4, u'r2': 6}]
        """
        return self.__select.all_projected_attributes()

    @proxy
    def index_name(self, index_name):
        """
        >>> connection(
        ...   Query(table2)
        ...     .key_eq("h", 42)
        ...     .index_name("lsi")
        ... ).items
        [{u'h': 42, u'r1': 5, u'r2': 5}, {u'h': 42, u'r1': 4, u'r2': 6}, {u'h': 42, u'r1': 3, u'r2': 7}, {u'h': 42, u'r1': 2, u'r2': 8}, {u'h': 42, u'r1': 1, u'r2': 9}, {u'h': 42, u'r1': 0, u'r2': 10}]
        """
        return self.__index_name.set(index_name)

    def scan_index_forward_true(self):
        """
        Set ScanIndexForward to true. Items in the response will be sorted with ascending range keys.

        >>> connection(
        ...   Query(table2)
        ...     .key_eq("h", 42)
        ...     .project("r1")
        ...     .scan_index_forward_true()
        ... ).items
        [{u'r1': 0}, {u'r1': 1}, {u'r1': 2}, {u'r1': 3}, {u'r1': 4}, {u'r1': 5}, {u'r1': 6}, {u'r1': 7}, {u'r1': 8}, {u'r1': 9}]
        """
        return self.__scan_index_forward.set(True)

    def scan_index_forward_false(self):
        """
        Set ScanIndexForward to false. Items in the response will be sorted with descending range keys.

        >>> connection(
        ...   Query(table2)
        ...     .key_eq("h", 42)
        ...     .project("r1")
        ...     .scan_index_forward_false()
        ... ).items
        [{u'r1': 9}, {u'r1': 8}, {u'r1': 7}, {u'r1': 6}, {u'r1': 5}, {u'r1': 4}, {u'r1': 3}, {u'r1': 2}, {u'r1': 1}, {u'r1': 0}]
        """
        return self.__scan_index_forward.set(False)

    @proxy
    def project(self, *names):
        """
        >>> connection(
        ...   Query(table2)
        ...     .key_eq("h", 42)
        ...     .project("r2")
        ... ).items
        [{u'r2': 10}, {u'r2': 9}, {u'r2': 8}, {u'r2': 7}, {u'r2': 6}, {u'r2': 5}, {}, {}, {}, {}]
        """
        return self.__projection_expression.add(*names)

    @proxy
    def filter_expression(self, expression):
        """
        >>> connection(
        ...   Query(table2)
        ...     .key_eq("h", 42)
        ...     .key_ge("r1", 2)
        ...     .filter_expression("#syn IN (:val1, :val2)")
        ...     .expression_attribute_name("syn", "r2")
        ...     .expression_attribute_value("val1", 5)
        ...     .expression_attribute_value("val2", 7)
        ... ).items
        [{u'h': 42, u'r1': 3, u'r2': 7}, {u'h': 42, u'r1': 5, u'r2': 5}]
        """
        return self.__filter_expression.set(expression)

    @proxy
    def expression_attribute_name(self, synonym, name):
        """
        See :meth:`filter_expression` for an example.
        """
        return self.__expression_attribute_names.add(synonym, name)

    @proxy
    def expression_attribute_value(self, name, value):
        """
        See :meth:`filter_expression` for an example.
        """
        return self.__expression_attribute_values.add(name, value)

    @proxy
    def consistent_read_true(self):
        """
        >>> connection(
        ...   Query(table)
        ...     .key_eq("h", 0)
        ...     .consistent_read_true()
        ...     .return_consumed_capacity_total()
        ... ).consumed_capacity.capacity_units
        1.0
        """
        return self.__consistent_read.true()

    @proxy
    def consistent_read_false(self):
        """
        >>> connection(
        ...   Query(table)
        ...     .key_eq("h", 0)
        ...     .consistent_read_false()
        ...     .return_consumed_capacity_total()
        ... ).consumed_capacity.capacity_units
        0.5
        """
        return self.__consistent_read.false()

    @proxy
    def return_consumed_capacity_total(self):
        """
        >>> connection(
        ...   Query(table)
        ...     .key_eq("h", 0)
        ...     .return_consumed_capacity_total()
        ... ).consumed_capacity.capacity_units
        0.5
        """
        return self.__return_consumed_capacity.total()

    @proxy
    def return_consumed_capacity_indexes(self):
        """
        >>> c1 = connection(
        ...   Query(table)
        ...     .key_eq("h", 0)
        ...     .return_consumed_capacity_indexes()
        ... ).consumed_capacity
        >>> c1.capacity_units
        0.5
        >>> c1.table.capacity_units
        0.5
        >>> print c1.global_secondary_indexes
        None

        >>> c2 = connection(
        ...   Query(table)
        ...     .index_name("gsi")
        ...     .key_eq("gh", 0)
        ...     .return_consumed_capacity_indexes()
        ... ).consumed_capacity
        >>> c2.capacity_units
        0.5
        >>> c2.table.capacity_units
        0.0
        >>> c2.global_secondary_indexes["gsi"].capacity_units
        0.5
        """
        return self.__return_consumed_capacity.indexes()

    @proxy
    def return_consumed_capacity_none(self):
        """
        >>> print connection(
        ...   Query(table)
        ...     .key_eq("h", 0)
        ...     .return_consumed_capacity_none()
        ... ).consumed_capacity
        None
        """
        return self.__return_consumed_capacity.none()


class QueryUnitTests(_tst.UnitTests):
    def test_name(self):
        self.assertEqual(Query("Aaa").name, "Query")

    def test_table_name(self):
        self.assertEqual(Query().table_name("Aaa").payload, {"TableName": "Aaa"})

    def test_constructor(self):
        self.assertEqual(Query("Aaa").payload, {"TableName": "Aaa"})

    def test_key_eq(self):
        self.assertEqual(
            Query("Aaa").key_eq("name", 42).payload,
            {
                "TableName": "Aaa",
                "KeyConditions": {"name": {"ComparisonOperator": "EQ", "AttributeValueList": [{"N": "42"}]}},
            }
        )

    def test_key_le(self):
        self.assertEqual(
            Query("Aaa").key_le("name", 42).payload,
            {
                "TableName": "Aaa",
                "KeyConditions": {"name": {"ComparisonOperator": "LE", "AttributeValueList": [{"N": "42"}]}},
            }
        )

    def test_key_lt(self):
        self.assertEqual(
            Query("Aaa").key_lt("name", 42).payload,
            {
                "TableName": "Aaa",
                "KeyConditions": {"name": {"ComparisonOperator": "LT", "AttributeValueList": [{"N": "42"}]}},
            }
        )

    def test_key_ge(self):
        self.assertEqual(
            Query("Aaa").key_ge("name", 42).payload,
            {
                "TableName": "Aaa",
                "KeyConditions": {"name": {"ComparisonOperator": "GE", "AttributeValueList": [{"N": "42"}]}},
            }
        )

    def test_key_gt(self):
        self.assertEqual(
            Query("Aaa").key_gt("name", 42).payload,
            {
                "TableName": "Aaa",
                "KeyConditions": {"name": {"ComparisonOperator": "GT", "AttributeValueList": [{"N": "42"}]}},
            }
        )

    def test_key_begins_with(self):
        self.assertEqual(
            Query("Aaa").key_begins_with("name", u"prefix").payload,
            {
                "TableName": "Aaa",
                "KeyConditions": {"name": {"ComparisonOperator": "BEGINS_WITH", "AttributeValueList": [{"S": "prefix"}]}},
            }
        )

    def test_key_between(self):
        self.assertEqual(
            Query("Aaa").key_between("name", 42, 44).payload,
            {
                "TableName": "Aaa",
                "KeyConditions": {"name": {"ComparisonOperator": "BETWEEN", "AttributeValueList": [{"N": "42"}, {"N": "44"}]}},
            }
        )

    def test_exclusive_start_key(self):
        self.assertEqual(Query("Aaa").exclusive_start_key({"h": u"v"}).payload, {"TableName": "Aaa", "ExclusiveStartKey": {"h": {"S": "v"}}})

    def test_limit(self):
        self.assertEqual(Query("Aaa").limit(4).payload, {"TableName": "Aaa", "Limit": 4})

    def test_select_all_attributes(self):
        self.assertEqual(Query("Aaa").select_all_attributes().payload, {"TableName": "Aaa", "Select": "ALL_ATTRIBUTES"})

    def test_select_all_projected_attributes(self):
        self.assertEqual(Query("Aaa").select_all_projected_attributes().payload, {"TableName": "Aaa", "Select": "ALL_PROJECTED_ATTRIBUTES"})

    def test_select_count(self):
        self.assertEqual(Query("Aaa").select_count().payload, {"TableName": "Aaa", "Select": "COUNT"})

    def test_expression_attribute_name(self):
        self.assertEqual(Query("Aaa").expression_attribute_name("n", "p").payload, {"TableName": "Aaa", "ExpressionAttributeNames": {"#n": "p"}})

    def test_expression_attribute_value(self):
        self.assertEqual(Query("Aaa").expression_attribute_value("n", u"p").payload, {"TableName": "Aaa", "ExpressionAttributeValues": {":n": {"S": "p"}}})

    def test_project(self):
        self.assertEqual(Query("Aaa").project("a").payload, {"TableName": "Aaa", "ProjectionExpression": "a"})

    def test_return_consumed_capacity_total(self):
        self.assertEqual(Query("Aaa").return_consumed_capacity_total().payload, {"TableName": "Aaa", "ReturnConsumedCapacity": "TOTAL"})

    def test_return_consumed_capacity_indexes(self):
        self.assertEqual(Query("Aaa").return_consumed_capacity_indexes().payload, {"TableName": "Aaa", "ReturnConsumedCapacity": "INDEXES"})

    def test_return_consumed_capacity_none(self):
        self.assertEqual(Query("Aaa").return_consumed_capacity_none().payload, {"TableName": "Aaa", "ReturnConsumedCapacity": "NONE"})

    def test_filter_expression(self):
        self.assertEqual(Query("Aaa").filter_expression("a=b").payload, {"TableName": "Aaa", "FilterExpression": "a=b"})

    def test_consistent_read_true(self):
        self.assertEqual(Query("Aaa").consistent_read_true().payload, {"TableName": "Aaa", "ConsistentRead": True})

    def test_consistent_read_false(self):
        self.assertEqual(Query("Aaa").consistent_read_false().payload, {"TableName": "Aaa", "ConsistentRead": False})

    def test_index_name(self):
        self.assertEqual(Query("Aaa").index_name("foo").payload, {"TableName": "Aaa", "IndexName": "foo"})

    def test_scan_index_forward_true(self):
        self.assertEqual(Query("Aaa").scan_index_forward_true().payload, {"TableName": "Aaa", "ScanIndexForward": True})

    def test_scan_index_forward_false(self):
        self.assertEqual(Query("Aaa").scan_index_forward_false().payload, {"TableName": "Aaa", "ScanIndexForward": False})


class QueryResponseUnitTests(_tst.UnitTests):
    def test_all_none(self):
        r = QueryResponse()
        self.assertIsNone(r.consumed_capacity)
        self.assertIsNone(r.count)
        self.assertIsNone(r.items)
        self.assertIsNone(r.last_evaluated_key)
        self.assertIsNone(r.scanned_count)

    def test_all_set(self):
        unprocessed_keys = object()
        r = QueryResponse(ConsumedCapacity={}, Count=1, Items=[{"h": {"S": "a"}}], LastEvaluatedKey={"h": {"S": "b"}}, ScannedCount=2)
        self.assertIsInstance(r.consumed_capacity, ConsumedCapacity)
        self.assertEqual(r.count, 1)
        self.assertEqual(r.items, [{"h": "a"}])
        self.assertEqual(r.last_evaluated_key, {"h": "b"})
        self.assertEqual(r.scanned_count, 2)