jacquev6/LowVoltage

View on GitHub
LowVoltage/actions/update_table.py

Summary

Maintainability
F
1 wk
Test Coverage
# coding: utf8

# Copyright 2014-2015 Vincent Jacques <vincent@vincent-jacques.net>

"""
When given a :class:`UpdateTable`, the connection will return a :class:`UpdateTableResponse`:

.. testsetup::

    table = "LowVoltage.Tests.Doc.UpdateTable.1"
    table2 = "LowVoltage.Tests.Doc.UpdateTable.2"
    table3 = "LowVoltage.Tests.Doc.UpdateTable.3"
    table4 = "LowVoltage.Tests.Doc.UpdateTable.4"
    connection(CreateTable(table).hash_key("h", STRING).provisioned_throughput(1, 1))
    connection(
        CreateTable(table2).hash_key("h", STRING).provisioned_throughput(1, 1)
            .global_secondary_index("gsi").hash_key("hh", STRING).range_key("rr", NUMBER).provisioned_throughput(1, 1).project_all()
    )
    connection(
        CreateTable(table3).hash_key("h", STRING).provisioned_throughput(1, 1)
            .global_secondary_index("gsi").hash_key("hh", STRING).range_key("rr", NUMBER).provisioned_throughput(1, 1).project_all()
    )
    connection(CreateTable(table4).hash_key("h", STRING).provisioned_throughput(1, 1))
    wait_for_table_activation(connection, table)
    wait_for_table_activation(connection, table2)
    wait_for_table_activation(connection, table3)
    wait_for_table_activation(connection, table4)

>>> r = connection(
...   UpdateTable(table)
...     .provisioned_throughput(2, 2)
... )
>>> r
<LowVoltage.actions.update_table.UpdateTableResponse ...>
>>> r.table_description.table_status
u'UPDATING'

Note that you can use the :func:`.wait_for_table_activation` compound to poll the table status until it's updated. See :ref:`actions-vs-compounds` in the user guide.

.. testcleanup::

    wait_for_table_activation(connection, table)
    wait_for_table_activation(connection, table2)
    wait_for_table_activation(connection, table3)
    wait_for_table_activation(connection, table4)
    connection(DeleteTable(table))
    connection(DeleteTable(table2))
    connection(DeleteTable(table3))
    connection(DeleteTable(table4))
    wait_for_table_deletion(connection, table)
    wait_for_table_deletion(connection, table2)
    wait_for_table_deletion(connection, table3)
    wait_for_table_deletion(connection, table4)
"""

import datetime

import LowVoltage as _lv
import LowVoltage.testing as _tst
from .action import Action
from .next_gen_mixins import variadic, proxy
from .next_gen_mixins import (
    TableName,
)
from .return_types import TableDescription, _is_dict


class UpdateTableResponse(object):
    """
    UpdateTableResponse()

    The `UpdateTable response <http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateTable.html#API_UpdateTable_ResponseElements>`__.
    """

    def __init__(
        self,
        TableDescription=None,
        **dummy
    ):
        self.__table_description = TableDescription

    @property
    def table_description(self):
        """
        The description of the table you just updated.

        :type: ``None`` or :class:`.TableDescription`
        """
        if _is_dict(self.__table_description):
            return TableDescription(**self.__table_description)


class UpdateTable(Action):
    """
    The `UpdateTable request <http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateTable.html#API_UpdateTable_RequestParameters>`__.
    """

    def __init__(self, table_name=None):
        """
        Passing ``table_name`` to the constructor is like calling :meth:`table_name` on the new instance.
        """
        super(UpdateTable, self).__init__("UpdateTable", UpdateTableResponse)
        self.__table_name = TableName(self, table_name)
        self.__attribute_definitions = {}
        self._read_capacity_units = None
        self._write_capacity_units = None
        self.__gsis = {}
        self.__active_index = self

    @property
    def payload(self):
        # @todo Simplify, make more linear
        data = {}
        data.update(self.__table_name.payload)
        if self.__attribute_definitions:
            data["AttributeDefinitions"] = [
                {"AttributeName": name, "AttributeType": typ}
                for name, typ in self.__attribute_definitions.iteritems()
            ]
        throughput = {}
        if self._read_capacity_units:
            throughput["ReadCapacityUnits"] = self._read_capacity_units
        if self._write_capacity_units:
            throughput["WriteCapacityUnits"] = self._write_capacity_units
        if throughput:
            data["ProvisionedThroughput"] = throughput
        if self.__gsis:
            data["GlobalSecondaryIndexUpdates"] = [i.payload for i in self.__gsis.itervalues()]
        return data

    class _Index(object):
        def __init__(self, verb, name):
            self._verb = verb
            self.__name = name
            self._hash_key = None
            self._range_key = None
            self._projection = None
            self._read_capacity_units = None
            self._write_capacity_units = None

        @property
        def payload(self):
            data = {"IndexName": self.__name}
            schema = []
            if self._hash_key:
                schema.append({"AttributeName": self._hash_key, "KeyType": "HASH"})
            if self._range_key:
                schema.append({"AttributeName": self._range_key, "KeyType": "RANGE"})
            if schema:
                data["KeySchema"] = schema
            if isinstance(self._projection, basestring):
                data["Projection"] = {"ProjectionType": self._projection}
            elif self._projection:
                data["Projection"] = {"ProjectionType": "INCLUDE", "NonKeyAttributes": self._projection}
            throughput = {}
            if self._read_capacity_units:
                throughput["ReadCapacityUnits"] = self._read_capacity_units
            if self._write_capacity_units:
                throughput["WriteCapacityUnits"] = self._write_capacity_units
            if throughput:
                data["ProvisionedThroughput"] = throughput
            return {self._verb: data}

    @proxy
    def table_name(self, table_name):
        """
        See :meth:`update_global_secondary_index` for an example.
        """
        return self.__table_name.set(table_name)

    def hash_key(self, name, typ=None):
        """
        Set the hash key in KeySchema for the active index.
        If you provide a second argument, :meth:`attribute_definition` will be called as well.

        :raise: :exc:`.BuilderError` if called when no index is active or if the active index is not being created.

        See :meth:`create_global_secondary_index` for an example.
        """
        self.__check_active_index()
        self.__active_index._hash_key = name
        if typ is not None:
            self.attribute_definition(name, typ)
        return self

    def range_key(self, name, typ=None):
        """
        Set the range key in KeySchema for the active index.
        If you provide a second argument, :meth:`attribute_definition` will be called as well.

        :raise: :exc:`.BuilderError` if called when no index is active or if the active index is not being created.

        See :meth:`create_global_secondary_index` for an example.
        """
        self.__check_active_index()
        self.__active_index._range_key = name
        if typ is not None:
            self.attribute_definition(name, typ)
        return self

    def attribute_definition(self, name, typ):
        """
        Set the type of an attribute in AttributeDefinitions.
        Key attribute must be typed. See :mod:`.attribute_types` for constants to be passed to this method.
        """
        self.__attribute_definitions[name] = typ
        return self

    def provisioned_throughput(self, read_capacity_units, write_capacity_units):
        """
        Set the new provisioned throughput for the table or the active index.

        See :meth:`create_global_secondary_index` for an example.
        """
        self.__active_index._read_capacity_units = read_capacity_units
        self.__active_index._write_capacity_units = write_capacity_units
        return self

    def create_global_secondary_index(self, name):
        """
        Create a new GSI.
        This method sets the active index: methods like :meth:`provisioned_throughput` will apply to the index.

        >>> connection(
        ...   UpdateTable(table4)
        ...     .create_global_secondary_index("gsi")
        ...       .hash_key("hh", STRING)
        ...       .range_key("rr", NUMBER)
        ...       .project_all()
        ...       .provisioned_throughput(2, 2)
        ... )
        <LowVoltage.actions.update_table.UpdateTableResponse ...>
        """
        if name not in self.__gsis:
            self.__gsis[name] = self._Index("Create", name)
        self.__active_index = self.__gsis[name]
        return self

    def update_global_secondary_index(self, name):
        """
        Update an existing GSI.
        This method sets the active index: methods like :meth:`provisioned_throughput` will apply to the index.

        >>> connection(
        ...   UpdateTable()
        ...     .table_name(table2)
        ...     .update_global_secondary_index("gsi")
        ...       .provisioned_throughput(2, 2)
        ... )
        <LowVoltage.actions.update_table.UpdateTableResponse ...>
        """
        if name not in self.__gsis:
            self.__gsis[name] = self._Index("Update", name)
        self.__active_index = self.__gsis[name]
        return self

    def delete_global_secondary_index(self, name):
        """
        Mark a GSI for deletion.

        This method does not set the active index, because there is nothing to modify.

        >>> connection(
        ...   UpdateTable(table3)
        ...     .delete_global_secondary_index("gsi")
        ... )
        <LowVoltage.actions.update_table.UpdateTableResponse ...>
        """
        self.__gsis[name] = self._Index("Delete", name)
        return self

    def table(self):
        """
        Reset the active index: methods like :meth:`provisioned_throughput` will apply to the table.
        """
        self.__active_index = self
        return self

    def project_all(self):
        """
        Set ProjectionType to ALL for the active index.

        :raise: :exc:`.BuilderError` if called when no index is active or if the active index is not being created.

        See :meth:`create_global_secondary_index` for an example.
        """
        self.__check_active_index()
        self.__active_index._projection = "ALL"
        return self

    def project_keys_only(self):
        """
        Set ProjectionType to KEYS_ONLY for the active index.

        :raise: :exc:`.BuilderError` if called when no index is active or if the active index is not being created.
        """
        self.__check_active_index()
        self.__active_index._projection = "KEYS_ONLY"
        return self

    @variadic(basestring)
    def project(self, *attrs):
        """
        Set ProjectionType to INCLUDE for the active index and add names to NonKeyAttributes.

        :raise: :exc:`.BuilderError` if called when no index is active or if the active index is not being created.
        """
        self.__check_active_index()
        if not isinstance(self.__active_index._projection, list):
            self.__active_index._projection = []
        self.__active_index._projection.extend(attrs)
        return self

    def __check_active_index(self):
        if self.__active_index is self or self.__active_index._verb != "Create":
            raise _lv.BuilderError("No active index or active index not being created.")


class UpdateTableUnitTests(_tst.UnitTests):
    def test_name(self):
        self.assertEqual(UpdateTable("Foo").name, "UpdateTable")

    def test_constructor(self):
        self.assertEqual(UpdateTable("Foo").payload, {"TableName": "Foo"})

    def test_table_name(self):
        self.assertEqual(UpdateTable().table_name("Foo").payload, {"TableName": "Foo"})

    def test_throughput(self):
        self.assertEqual(
            UpdateTable("Foo").provisioned_throughput(42, 43).payload,
            {
                "TableName": "Foo",
                "ProvisionedThroughput": {"ReadCapacityUnits": 42, "WriteCapacityUnits": 43},
            }
        )

    def test_attribute_definition(self):
        self.assertEqual(
            UpdateTable("Foo").attribute_definition("a", "B").payload,
            {
                "TableName": "Foo",
                "AttributeDefinitions": [{"AttributeName": "a", "AttributeType": "B"}],
            }
        )

    def test_create_gsi(self):
        self.assertEqual(
            UpdateTable("Foo").create_global_secondary_index("the_gsi").payload,
            {
                "TableName": "Foo",
                "GlobalSecondaryIndexUpdates": [
                    {"Create": {"IndexName": "the_gsi"}},
                ],
            }
        )

    def test_create_gsi_provisioned_throughput(self):
        self.assertEqual(
            UpdateTable("Foo").create_global_secondary_index("the_gsi").provisioned_throughput(1, 2).payload,
            {
                "TableName": "Foo",
                "GlobalSecondaryIndexUpdates": [
                    {"Create": {"IndexName": "the_gsi", "ProvisionedThroughput": {"ReadCapacityUnits": 1, "WriteCapacityUnits": 2}}},
                ],
            }
        )

    def test_create_gsi_hash_key(self):
        self.assertEqual(
            UpdateTable("Foo").create_global_secondary_index("the_gsi").hash_key("h").payload,
            {
                "TableName": "Foo",
                "GlobalSecondaryIndexUpdates": [
                    {"Create": {"IndexName": "the_gsi", "KeySchema": [{"AttributeName": "h", "KeyType": "HASH"}]}},
                ],
            }
        )

    def test_create_gsi_range_key(self):
        self.assertEqual(
            UpdateTable("Foo").create_global_secondary_index("the_gsi").range_key("r").payload,
            {
                "TableName": "Foo",
                "GlobalSecondaryIndexUpdates": [
                    {"Create": {"IndexName": "the_gsi", "KeySchema": [{"AttributeName": "r", "KeyType": "RANGE"}]}},
                ],
            }
        )

    def test_create_gsi_hash_key_with_type(self):
        self.assertEqual(
            UpdateTable("Foo").create_global_secondary_index("the_gsi").hash_key("h", "S").payload,
            {
                "TableName": "Foo",
                "GlobalSecondaryIndexUpdates": [
                    {"Create": {"IndexName": "the_gsi", "KeySchema": [{"AttributeName": "h", "KeyType": "HASH"}]}},
                ],
                "AttributeDefinitions": [{"AttributeName": "h", "AttributeType": "S"}]
            }
        )

    def test_create_gsi_range_key_with_type(self):
        self.assertEqual(
            UpdateTable("Foo").create_global_secondary_index("the_gsi").range_key("r", "N").payload,
            {
                "TableName": "Foo",
                "GlobalSecondaryIndexUpdates": [
                    {"Create": {"IndexName": "the_gsi", "KeySchema": [{"AttributeName": "r", "KeyType": "RANGE"}]}},
                ],
                "AttributeDefinitions": [{"AttributeName": "r", "AttributeType": "N"}]
            }
        )

    def test_create_gsi_project_all(self):
        self.assertEqual(
            UpdateTable("Foo").create_global_secondary_index("the_gsi").project_all().payload,
            {
                "TableName": "Foo",
                "GlobalSecondaryIndexUpdates": [
                    {"Create": {"IndexName": "the_gsi", "Projection": {"ProjectionType": "ALL"}}},
                ],
            }
        )

    def test_create_gsi_project_keys_only(self):
        self.assertEqual(
            UpdateTable("Foo").create_global_secondary_index("the_gsi").project_keys_only().payload,
            {
                "TableName": "Foo",
                "GlobalSecondaryIndexUpdates": [
                    {"Create": {"IndexName": "the_gsi", "Projection": {"ProjectionType": "KEYS_ONLY"}}},
                ],
            }
        )

    def test_create_gsi_project(self):
        self.assertEqual(
            UpdateTable("Foo").create_global_secondary_index("the_gsi").project("a", ["b", "c"]).project("d").payload,
            {
                "TableName": "Foo",
                "GlobalSecondaryIndexUpdates": [
                    {"Create": {"IndexName": "the_gsi", "Projection": {"ProjectionType": "INCLUDE", "NonKeyAttributes": ["a", "b", "c", "d"]}}},
                ],
            }
        )

    def test_update_gsi(self):
        self.assertEqual(
            UpdateTable("Foo").update_global_secondary_index("the_gsi").payload,
            {
                "TableName": "Foo",
                "GlobalSecondaryIndexUpdates": [
                    {"Update": {"IndexName": "the_gsi"}},
                ],
            }
        )

    def test_update_gsi_provisioned_throughput(self):
        self.assertEqual(
            UpdateTable("Foo").update_global_secondary_index("the_gsi").provisioned_throughput(42, 43).payload,
            {
                "TableName": "Foo",
                "GlobalSecondaryIndexUpdates": [
                    {"Update": {"IndexName": "the_gsi", "ProvisionedThroughput": {"ReadCapacityUnits": 42, "WriteCapacityUnits": 43}}},
                ],
            }
        )

    def test_delete_gsi(self):
        self.assertEqual(
            UpdateTable("Foo").delete_global_secondary_index("the_gsi").payload,
            {
                "TableName": "Foo",
                "GlobalSecondaryIndexUpdates": [
                    {"Delete": {"IndexName": "the_gsi"}},
                ],
            }
        )

    def test_back_to_update_gsi_after_back_to_table(self):
        self.assertEqual(
            UpdateTable("Foo").update_global_secondary_index("the_gsi").table().provisioned_throughput(12, 13)
                .update_global_secondary_index("the_gsi").provisioned_throughput(42, 43).payload,
            {
                "TableName": "Foo",
                "GlobalSecondaryIndexUpdates": [
                    {"Update": {"IndexName": "the_gsi", "ProvisionedThroughput": {"ReadCapacityUnits": 42, "WriteCapacityUnits": 43}}},
                ],
                "ProvisionedThroughput": {"ReadCapacityUnits": 12, "WriteCapacityUnits": 13},
            }
        )

    def test_back_to_create_gsi_after_back_to_table(self):
        self.assertEqual(
            UpdateTable("Foo").create_global_secondary_index("the_gsi").table().provisioned_throughput(12, 13)
                .create_global_secondary_index("the_gsi").provisioned_throughput(42, 43).payload,
            {
                "TableName": "Foo",
                "GlobalSecondaryIndexUpdates": [
                    {"Create": {"IndexName": "the_gsi", "ProvisionedThroughput": {"ReadCapacityUnits": 42, "WriteCapacityUnits": 43}}},
                ],
                "ProvisionedThroughput": {"ReadCapacityUnits": 12, "WriteCapacityUnits": 13},
            }
        )

    def test_back_to_update_gsi_after_back_to_table_after_create_gsi(self):
        self.assertEqual(
            UpdateTable("Foo").create_global_secondary_index("the_gsi").table().provisioned_throughput(12, 13)
                .update_global_secondary_index("the_gsi").provisioned_throughput(42, 43).payload,
            {
                "TableName": "Foo",
                "GlobalSecondaryIndexUpdates": [
                    {"Create": {"IndexName": "the_gsi", "ProvisionedThroughput": {"ReadCapacityUnits": 42, "WriteCapacityUnits": 43}}},
                ],
                "ProvisionedThroughput": {"ReadCapacityUnits": 12, "WriteCapacityUnits": 13},
            }
        )

    def test_hash_key_without_active_index(self):
        with self.assertRaises(_lv.BuilderError) as catcher:
            UpdateTable("Foo").hash_key("h")
        self.assertEqual(catcher.exception.args, ("No active index or active index not being created.",))

    def test_range_key_without_active_index(self):
        with self.assertRaises(_lv.BuilderError) as catcher:
            UpdateTable("Foo").range_key("r")
        self.assertEqual(catcher.exception.args, ("No active index or active index not being created.",))

    def test_project_all_without_active_index(self):
        with self.assertRaises(_lv.BuilderError) as catcher:
            UpdateTable("Foo").project_all()
        self.assertEqual(catcher.exception.args, ("No active index or active index not being created.",))

    def test_project_without_active_index(self):
        with self.assertRaises(_lv.BuilderError) as catcher:
            UpdateTable("Foo").project("a")
        self.assertEqual(catcher.exception.args, ("No active index or active index not being created.",))

    def test_project_keys_only_without_active_index(self):
        with self.assertRaises(_lv.BuilderError) as catcher:
            UpdateTable("Foo").project_keys_only()
        self.assertEqual(catcher.exception.args, ("No active index or active index not being created.",))

    def test_hash_key_with_updating_active_index(self):
        with self.assertRaises(_lv.BuilderError) as catcher:
            UpdateTable("Foo").update_global_secondary_index("gsi").hash_key("h")
        self.assertEqual(catcher.exception.args, ("No active index or active index not being created.",))

    def test_range_key_with_updating_active_index(self):
        with self.assertRaises(_lv.BuilderError) as catcher:
            UpdateTable("Foo").update_global_secondary_index("gsi").range_key("r")
        self.assertEqual(catcher.exception.args, ("No active index or active index not being created.",))

    def test_project_all_with_updating_active_index(self):
        with self.assertRaises(_lv.BuilderError) as catcher:
            UpdateTable("Foo").update_global_secondary_index("gsi").project_all()
        self.assertEqual(catcher.exception.args, ("No active index or active index not being created.",))

    def test_project_with_updating_active_index(self):
        with self.assertRaises(_lv.BuilderError) as catcher:
            UpdateTable("Foo").update_global_secondary_index("gsi").project("a")
        self.assertEqual(catcher.exception.args, ("No active index or active index not being created.",))

    def test_project_keys_only_with_updating_active_index(self):
        with self.assertRaises(_lv.BuilderError) as catcher:
            UpdateTable("Foo").update_global_secondary_index("gsi").project_keys_only()
        self.assertEqual(catcher.exception.args, ("No active index or active index not being created.",))


class UpdateTableResponseUnitTests(_tst.UnitTests):
    def test_all_none(self):
        r = UpdateTableResponse()
        self.assertIsNone(r.table_description)

    def test_all_set(self):
        r = UpdateTableResponse(TableDescription={})
        self.assertIsInstance(r.table_description, TableDescription)