localstack/localstack

View on GitHub
localstack/services/cloudformation/provider.py

Summary

Maintainability
F
4 days
Test Coverage
import copy
import json
import logging
import re
from collections import defaultdict
from copy import deepcopy

from moto.cloudformation import cloudformation_backends

from localstack.aws.api import CommonServiceException, RequestContext, handler
from localstack.aws.api.cloudformation import (
    AlreadyExistsException,
    CallAs,
    ChangeSetNameOrId,
    ChangeSetNotFoundException,
    ChangeSetType,
    ClientRequestToken,
    CloudformationApi,
    CreateChangeSetInput,
    CreateChangeSetOutput,
    CreateStackInput,
    CreateStackInstancesInput,
    CreateStackInstancesOutput,
    CreateStackOutput,
    CreateStackSetInput,
    CreateStackSetOutput,
    DeleteChangeSetOutput,
    DeleteStackInstancesInput,
    DeleteStackInstancesOutput,
    DeleteStackSetOutput,
    DescribeChangeSetOutput,
    DescribeStackEventsOutput,
    DescribeStackResourceOutput,
    DescribeStackResourcesOutput,
    DescribeStackSetOperationOutput,
    DescribeStackSetOutput,
    DescribeStacksOutput,
    DisableRollback,
    EnableTerminationProtection,
    ExecuteChangeSetOutput,
    ExecutionStatus,
    ExportName,
    GetTemplateOutput,
    GetTemplateSummaryInput,
    GetTemplateSummaryOutput,
    IncludePropertyValues,
    InsufficientCapabilitiesException,
    InvalidChangeSetStatusException,
    ListChangeSetsOutput,
    ListExportsOutput,
    ListImportsOutput,
    ListStackInstancesInput,
    ListStackInstancesOutput,
    ListStackResourcesOutput,
    ListStackSetsInput,
    ListStackSetsOutput,
    ListStacksOutput,
    LogicalResourceId,
    NextToken,
    Parameter,
    PhysicalResourceId,
    RegisterTypeInput,
    RegisterTypeOutput,
    RetainExceptOnCreate,
    RetainResources,
    RoleARN,
    StackName,
    StackNameOrId,
    StackSetName,
    StackStatus,
    StackStatusFilter,
    TemplateParameter,
    TemplateStage,
    UpdateStackInput,
    UpdateStackOutput,
    UpdateStackSetInput,
    UpdateStackSetOutput,
    UpdateTerminationProtectionOutput,
    ValidateTemplateInput,
    ValidateTemplateOutput,
)
from localstack.aws.connect import connect_to
from localstack.services.cloudformation import api_utils
from localstack.services.cloudformation.engine import parameters as param_resolver
from localstack.services.cloudformation.engine import template_deployer, template_preparer
from localstack.services.cloudformation.engine.entities import (
    Stack,
    StackChangeSet,
    StackInstance,
    StackSet,
)
from localstack.services.cloudformation.engine.parameters import strip_parameter_type
from localstack.services.cloudformation.engine.template_deployer import NoStackUpdates
from localstack.services.cloudformation.engine.template_utils import resolve_stack_conditions
from localstack.services.cloudformation.engine.transformers import (
    FailedTransformationException,
)
from localstack.services.cloudformation.stores import (
    cloudformation_stores,
    find_active_stack_by_name_or_id,
    find_change_set,
    find_stack,
    find_stack_by_id,
    get_cloudformation_store,
)
from localstack.state import StateVisitor
from localstack.utils.collections import (
    remove_attributes,
    select_attributes,
    select_from_typed_dict,
)
from localstack.utils.json import clone
from localstack.utils.strings import long_uid, short_uid

LOG = logging.getLogger(__name__)

ARN_CHANGESET_REGEX = re.compile(
    r"arn:(aws|aws-us-gov|aws-cn):cloudformation:[-a-zA-Z0-9]+:\d{12}:changeSet/[a-zA-Z][-a-zA-Z0-9]*/[-a-zA-Z0-9:/._+]+"
)
ARN_STACK_REGEX = re.compile(
    r"arn:(aws|aws-us-gov|aws-cn):cloudformation:[-a-zA-Z0-9]+:\d{12}:stack/[a-zA-Z][-a-zA-Z0-9]*/[-a-zA-Z0-9:/._+]+"
)


def clone_stack_params(stack_params):
    try:
        return clone(stack_params)
    except Exception as e:
        LOG.info("Unable to clone stack parameters: %s", e)
        return stack_params


def find_stack_instance(stack_set: StackSet, account: str, region: str):
    for instance in stack_set.stack_instances:
        if instance.metadata["Account"] == account and instance.metadata["Region"] == region:
            return instance
    return None


def stack_not_found_error(stack_name: str):
    # FIXME
    raise ValidationError("Stack with id %s does not exist" % stack_name)


def not_found_error(message: str):
    # FIXME
    raise ResourceNotFoundException(message)


class ValidationError(CommonServiceException):
    """General validation error type (defined in the AWS docs, but not part of the botocore spec)"""

    def __init__(self, message=None):
        super().__init__("ValidationError", message=message, sender_fault=True)


class ResourceNotFoundException(CommonServiceException):
    def __init__(self, message=None):
        super().__init__("ResourceNotFoundException", status_code=404, message=message)


class InternalFailure(CommonServiceException):
    def __init__(self, message=None):
        super().__init__("InternalFailure", status_code=500, message=message, sender_fault=False)


class CloudformationProvider(CloudformationApi):
    def _stack_status_is_active(self, stack_status: str) -> bool:
        return stack_status not in [StackStatus.DELETE_COMPLETE]

    def accept_state_visitor(self, visitor: StateVisitor):
        visitor.visit(cloudformation_stores)
        visitor.visit(cloudformation_backends)

    @handler("CreateStack", expand=False)
    def create_stack(self, context: RequestContext, request: CreateStackInput) -> CreateStackOutput:
        # TODO: test what happens when both TemplateUrl and Body are specified
        state = get_cloudformation_store(context.account_id, context.region)

        stack_name = request.get("StackName")

        # get stacks by name
        active_stack_candidates = [
            s
            for s in state.stacks.values()
            if s.stack_name == stack_name and self._stack_status_is_active(s.status)
        ]

        # TODO: fix/implement this code path
        #   this needs more investigation how Cloudformation handles it (e.g. normal stack create or does it create a separate changeset?)
        # REVIEW_IN_PROGRESS is another special status
        # in this case existing changesets are set to obsolete and the stack is created
        # review_stack_candidates = [s for s in stack_candidates if s.status == StackStatus.REVIEW_IN_PROGRESS]
        # if review_stack_candidates:
        # set changesets to obsolete
        # for cs in review_stack_candidates[0].change_sets:
        #     cs.execution_status = ExecutionStatus.OBSOLETE

        if active_stack_candidates:
            raise AlreadyExistsException(f"Stack [{stack_name}] already exists")

        template_body = request.get("TemplateBody") or ""
        if len(template_body) > 51200:
            raise ValidationError(
                f'1 validation error detected: Value \'{request["TemplateBody"]}\' at \'templateBody\' '
                "failed to satisfy constraint: Member must have length less than or equal to 51200"
            )
        api_utils.prepare_template_body(request)  # TODO: avoid mutating request directly

        template = template_preparer.parse_template(request["TemplateBody"])
        stack_name = template["StackName"] = request.get("StackName")
        if api_utils.validate_stack_name(stack_name) is False:
            raise ValidationError(
                f"1 validation error detected: Value '{stack_name}' at 'stackName' failed to satisfy constraint:\
                Member must satisfy regular expression pattern: [a-zA-Z][-a-zA-Z0-9]*|arn:[-a-zA-Z0-9:/._+]*"
            )

        if (
            "CAPABILITY_AUTO_EXPAND" not in request.get("Capabilities", [])
            and "Transform" in template.keys()
        ):
            raise InsufficientCapabilitiesException(
                "Requires capabilities : [CAPABILITY_AUTO_EXPAND]"
            )

        # resolve stack parameters
        new_parameters = param_resolver.convert_stack_parameters_to_dict(request.get("Parameters"))
        parameter_declarations = param_resolver.extract_stack_parameter_declarations(template)
        resolved_parameters = param_resolver.resolve_parameters(
            account_id=context.account_id,
            region_name=context.region,
            parameter_declarations=parameter_declarations,
            new_parameters=new_parameters,
            old_parameters={},
        )

        # handle conditions
        stack = Stack(context.account_id, context.region, request, template)

        try:
            template = template_preparer.transform_template(
                context.account_id,
                context.region,
                template,
                stack.stack_name,
                stack.resources,
                stack.mappings,
                {},  # TODO
                resolved_parameters,
            )
        except FailedTransformationException as e:
            stack.add_stack_event(
                stack.stack_name,
                stack.stack_id,
                status="ROLLBACK_IN_PROGRESS",
                status_reason=e.message,
            )
            stack.set_stack_status("ROLLBACK_COMPLETE")
            state.stacks[stack.stack_id] = stack
            return CreateStackOutput(StackId=stack.stack_id)

        stack = Stack(context.account_id, context.region, request, template)

        # resolve conditions
        raw_conditions = template.get("Conditions", {})
        resolved_stack_conditions = resolve_stack_conditions(
            account_id=context.account_id,
            region_name=context.region,
            conditions=raw_conditions,
            parameters=resolved_parameters,
            mappings=stack.mappings,
            stack_name=stack_name,
        )
        stack.set_resolved_stack_conditions(resolved_stack_conditions)

        stack.set_resolved_parameters(resolved_parameters)
        stack.template_body = json.dumps(template)
        state.stacks[stack.stack_id] = stack
        LOG.debug(
            'Creating stack "%s" with %s resources ...',
            stack.stack_name,
            len(stack.template_resources),
        )
        deployer = template_deployer.TemplateDeployer(context.account_id, context.region, stack)
        try:
            deployer.deploy_stack()
        except Exception as e:
            stack.set_stack_status("CREATE_FAILED")
            msg = 'Unable to create stack "%s": %s' % (stack.stack_name, e)
            LOG.exception("%s")
            raise ValidationError(msg) from e

        return CreateStackOutput(StackId=stack.stack_id)

    @handler("DeleteStack")
    def delete_stack(
        self,
        context: RequestContext,
        stack_name: StackName,
        retain_resources: RetainResources = None,
        role_arn: RoleARN = None,
        client_request_token: ClientRequestToken = None,
        **kwargs,
    ) -> None:
        stack = find_active_stack_by_name_or_id(context.account_id, context.region, stack_name)
        if not stack:
            # aws will silently ignore invalid stack names - we should do the same
            return
        deployer = template_deployer.TemplateDeployer(context.account_id, context.region, stack)
        deployer.delete_stack()

    @handler("UpdateStack", expand=False)
    def update_stack(
        self,
        context: RequestContext,
        request: UpdateStackInput,
    ) -> UpdateStackOutput:
        stack_name = request.get("StackName")
        stack = find_stack(context.account_id, context.region, stack_name)
        if not stack:
            return not_found_error(f'Unable to update non-existing stack "{stack_name}"')

        api_utils.prepare_template_body(request)
        template = template_preparer.parse_template(request["TemplateBody"])

        if (
            "CAPABILITY_AUTO_EXPAND" not in request.get("Capabilities", [])
            and "Transform" in template.keys()
        ):
            raise InsufficientCapabilitiesException(
                "Requires capabilities : [CAPABILITY_AUTO_EXPAND]"
            )

        new_parameters: dict[str, Parameter] = param_resolver.convert_stack_parameters_to_dict(
            request.get("Parameters")
        )
        parameter_declarations = param_resolver.extract_stack_parameter_declarations(template)
        resolved_parameters = param_resolver.resolve_parameters(
            account_id=context.account_id,
            region_name=context.region,
            parameter_declarations=parameter_declarations,
            new_parameters=new_parameters,
            old_parameters=stack.resolved_parameters,
        )

        resolved_stack_conditions = resolve_stack_conditions(
            account_id=context.account_id,
            region_name=context.region,
            conditions=template.get("Conditions", {}),
            parameters=resolved_parameters,
            mappings=template.get("Mappings", {}),
            stack_name=stack_name,
        )

        raw_new_template = copy.deepcopy(template)
        try:
            template = template_preparer.transform_template(
                context.account_id,
                context.region,
                template,
                stack.stack_name,
                stack.resources,
                stack.mappings,
                resolved_stack_conditions,
                resolved_parameters,
            )
            processed_template = copy.deepcopy(
                template
            )  # copying it here since it's being mutated somewhere downstream
        except FailedTransformationException as e:
            stack.add_stack_event(
                stack.stack_name,
                stack.stack_id,
                status="ROLLBACK_IN_PROGRESS",
                status_reason=e.message,
            )
            stack.set_stack_status("ROLLBACK_COMPLETE")
            return CreateStackOutput(StackId=stack.stack_id)

        deployer = template_deployer.TemplateDeployer(context.account_id, context.region, stack)
        # TODO: there shouldn't be a "new" stack on update
        new_stack = Stack(
            context.account_id, context.region, request, template, request["TemplateBody"]
        )
        new_stack.set_resolved_parameters(resolved_parameters)
        stack.set_resolved_parameters(resolved_parameters)
        stack.set_resolved_stack_conditions(resolved_stack_conditions)
        try:
            deployer.update_stack(new_stack)
        except NoStackUpdates as e:
            stack.set_stack_status("UPDATE_COMPLETE")
            if raw_new_template != processed_template:
                # processed templates seem to never return an exception here
                return UpdateStackOutput(StackId=stack.stack_id)
            raise ValidationError(str(e))
        except Exception as e:
            stack.set_stack_status("UPDATE_FAILED")
            msg = f'Unable to update stack "{stack_name}": {e}'
            LOG.exception("%s", msg)
            raise ValidationError(msg) from e

        return UpdateStackOutput(StackId=stack.stack_id)

    @handler("DescribeStacks")
    def describe_stacks(
        self,
        context: RequestContext,
        stack_name: StackName = None,
        next_token: NextToken = None,
        **kwargs,
    ) -> DescribeStacksOutput:
        # TODO: test & implement pagination
        state = get_cloudformation_store(context.account_id, context.region)

        if stack_name:
            if ARN_STACK_REGEX.match(stack_name):
                # we can get the stack directly since we index the store by ARN/stackID
                stack = state.stacks.get(stack_name)
                stacks = [stack.describe_details()] if stack else []
            else:
                # otherwise we have to find the active stack with the given name
                stack_candidates: list[Stack] = [
                    s for stack_arn, s in state.stacks.items() if s.stack_name == stack_name
                ]
                active_stack_candidates = [
                    s for s in stack_candidates if self._stack_status_is_active(s.status)
                ]
                stacks = [s.describe_details() for s in active_stack_candidates]
        else:
            # return all active stacks
            stack_list = list(state.stacks.values())
            stacks = [
                s.describe_details() for s in stack_list if self._stack_status_is_active(s.status)
            ]

        if stack_name and not stacks:
            raise ValidationError(f"Stack with id {stack_name} does not exist")

        return DescribeStacksOutput(Stacks=stacks)

    @handler("ListStacks")
    def list_stacks(
        self,
        context: RequestContext,
        next_token: NextToken = None,
        stack_status_filter: StackStatusFilter = None,
        **kwargs,
    ) -> ListStacksOutput:
        state = get_cloudformation_store(context.account_id, context.region)

        stacks = [
            s.describe_details()
            for s in state.stacks.values()
            if not stack_status_filter or s.status in stack_status_filter
        ]

        attrs = [
            "StackId",
            "StackName",
            "TemplateDescription",
            "CreationTime",
            "LastUpdatedTime",
            "DeletionTime",
            "StackStatus",
            "StackStatusReason",
            "ParentId",
            "RootId",
            "DriftInformation",
        ]
        stacks = [select_attributes(stack, attrs) for stack in stacks]
        return ListStacksOutput(StackSummaries=stacks)

    @handler("GetTemplate")
    def get_template(
        self,
        context: RequestContext,
        stack_name: StackName = None,
        change_set_name: ChangeSetNameOrId = None,
        template_stage: TemplateStage = None,
        **kwargs,
    ) -> GetTemplateOutput:
        if change_set_name:
            stack = find_change_set(
                context.account_id, context.region, stack_name=stack_name, cs_name=change_set_name
            )
        else:
            stack = find_stack(context.account_id, context.region, stack_name)
        if not stack:
            return stack_not_found_error(stack_name)

        if template_stage == TemplateStage.Processed and "Transform" in stack.template_body:
            copy_template = clone(stack.template_original)
            copy_template.pop("ChangeSetName", None)
            copy_template.pop("StackName", None)
            for resource in copy_template.get("Resources", {}).values():
                resource.pop("LogicalResourceId", None)
            template_body = json.dumps(copy_template)
        else:
            template_body = stack.template_body

        return GetTemplateOutput(
            TemplateBody=template_body,
            StagesAvailable=[TemplateStage.Original, TemplateStage.Processed],
        )

    @handler("GetTemplateSummary", expand=False)
    def get_template_summary(
        self,
        context: RequestContext,
        request: GetTemplateSummaryInput,
    ) -> GetTemplateSummaryOutput:
        stack_name = request.get("StackName")

        if stack_name:
            stack = find_stack(context.account_id, context.region, stack_name)
            if not stack:
                return stack_not_found_error(stack_name)
            template = stack.template
        else:
            api_utils.prepare_template_body(request)
            template = template_preparer.parse_template(request["TemplateBody"])
            request["StackName"] = "tmp-stack"
            stack = Stack(context.account_id, context.region, request, template)

        result: GetTemplateSummaryOutput = stack.describe_details()

        # build parameter declarations
        result["Parameters"] = list(
            param_resolver.extract_stack_parameter_declarations(template).values()
        )

        id_summaries = defaultdict(list)
        for resource_id, resource in stack.template_resources.items():
            res_type = resource["Type"]
            id_summaries[res_type].append(resource_id)

        result["ResourceTypes"] = list(id_summaries.keys())
        result["ResourceIdentifierSummaries"] = [
            {"ResourceType": key, "LogicalResourceIds": values}
            for key, values in id_summaries.items()
        ]
        result["Metadata"] = stack.template.get("Metadata")
        result["Version"] = stack.template.get("AWSTemplateFormatVersion", "2010-09-09")
        # these do not appear in the output
        result.pop("Capabilities", None)

        return select_from_typed_dict(GetTemplateSummaryOutput, result)

    def update_termination_protection(
        self,
        context: RequestContext,
        enable_termination_protection: EnableTerminationProtection,
        stack_name: StackNameOrId,
        **kwargs,
    ) -> UpdateTerminationProtectionOutput:
        stack = find_stack(context.account_id, context.region, stack_name)
        if not stack:
            raise ValidationError(f"Stack '{stack_name}' does not exist.")
        stack.metadata["EnableTerminationProtection"] = enable_termination_protection
        return UpdateTerminationProtectionOutput(StackId=stack.stack_id)

    @handler("CreateChangeSet", expand=False)
    def create_change_set(
        self, context: RequestContext, request: CreateChangeSetInput
    ) -> CreateChangeSetOutput:
        state = get_cloudformation_store(context.account_id, context.region)

        req_params = request
        change_set_type = req_params.get("ChangeSetType", "UPDATE")
        stack_name = req_params.get("StackName")
        change_set_name = req_params.get("ChangeSetName")
        template_body = req_params.get("TemplateBody")
        # s3 or secretsmanager url
        template_url = req_params.get("TemplateURL")

        # validate and resolve template
        if template_body and template_url:
            raise ValidationError(
                "Specify exactly one of 'TemplateBody' or 'TemplateUrl'"
            )  # TODO: check proper message

        if not template_body and not template_url:
            raise ValidationError(
                "Specify exactly one of 'TemplateBody' or 'TemplateUrl'"
            )  # TODO: check proper message

        api_utils.prepare_template_body(
            req_params
        )  # TODO: function has too many unclear responsibilities
        if not template_body:
            template_body = req_params[
                "TemplateBody"
            ]  # should then have been set by prepare_template_body
        template = template_preparer.parse_template(req_params["TemplateBody"])

        del req_params["TemplateBody"]  # TODO: stop mutating req_params
        template["StackName"] = stack_name
        # TODO: validate with AWS what this is actually doing?
        template["ChangeSetName"] = change_set_name

        # this is intentionally not in a util yet. Let's first see how the different operations deal with these before generalizing
        # handle ARN stack_name here (not valid for initial CREATE, since stack doesn't exist yet)
        if ARN_STACK_REGEX.match(stack_name):
            if not (stack := state.stacks.get(stack_name)):
                raise ValidationError(f"Stack '{stack_name}' does not exist.")
        else:
            # stack name specified, so fetch the stack by name
            stack_candidates: list[Stack] = [
                s for stack_arn, s in state.stacks.items() if s.stack_name == stack_name
            ]
            active_stack_candidates = [
                s for s in stack_candidates if self._stack_status_is_active(s.status)
            ]

            # on a CREATE an empty Stack should be generated if we didn't find an active one
            if not active_stack_candidates and change_set_type == ChangeSetType.CREATE:
                empty_stack_template = dict(template)
                empty_stack_template["Resources"] = {}
                req_params_copy = clone_stack_params(req_params)
                stack = Stack(
                    context.account_id,
                    context.region,
                    req_params_copy,
                    empty_stack_template,
                    template_body=template_body,
                )
                state.stacks[stack.stack_id] = stack
                stack.set_stack_status("REVIEW_IN_PROGRESS")
            else:
                if not active_stack_candidates:
                    raise ValidationError(f"Stack '{stack_name}' does not exist.")
                stack = active_stack_candidates[0]

        # TODO: test if rollback status is allowed as well
        if (
            change_set_type == ChangeSetType.CREATE
            and stack.status != StackStatus.REVIEW_IN_PROGRESS
        ):
            raise ValidationError(
                f"Stack [{stack_name}] already exists and cannot be created again with the changeSet [{change_set_name}]."
            )

        old_parameters: dict[str, Parameter] = {}
        match change_set_type:
            case ChangeSetType.UPDATE:
                # add changeset to existing stack
                old_parameters = {
                    k: strip_parameter_type(v) for k, v in stack.resolved_parameters.items()
                }
            case ChangeSetType.IMPORT:
                raise NotImplementedError()  # TODO: implement importing resources
            case ChangeSetType.CREATE:
                pass
            case _:
                msg = (
                    f"1 validation error detected: Value '{change_set_type}' at 'changeSetType' failed to satisfy "
                    f"constraint: Member must satisfy enum value set: [IMPORT, UPDATE, CREATE] "
                )
                raise ValidationError(msg)

        # resolve parameters
        new_parameters: dict[str, Parameter] = param_resolver.convert_stack_parameters_to_dict(
            request.get("Parameters")
        )
        parameter_declarations = param_resolver.extract_stack_parameter_declarations(template)
        resolved_parameters = param_resolver.resolve_parameters(
            account_id=context.account_id,
            region_name=context.region,
            parameter_declarations=parameter_declarations,
            new_parameters=new_parameters,
            old_parameters=old_parameters,
        )

        # TODO: remove this when fixing Stack.resources and transformation order
        #   currently we need to create a stack with existing resources + parameters so that resolve refs recursively in here will work.
        #   The correct way to do it would be at a later stage anyway just like a normal intrinsic function
        req_params_copy = clone_stack_params(req_params)
        temp_stack = Stack(context.account_id, context.region, req_params_copy, template)
        temp_stack.set_resolved_parameters(resolved_parameters)

        # TODO: everything below should be async
        # apply template transformations
        transformed_template = template_preparer.transform_template(
            context.account_id,
            context.region,
            template,
            stack_name=temp_stack.stack_name,
            resources=temp_stack.resources,
            mappings=temp_stack.mappings,
            conditions={},  # TODO: we don't have any resolved conditions yet at this point but we need the conditions because of the samtranslator...
            resolved_parameters=resolved_parameters,
        )

        # create change set for the stack and apply changes
        change_set = StackChangeSet(
            context.account_id, context.region, stack, req_params, transformed_template
        )
        # only set parameters for the changeset, then switch to stack on execute_change_set
        change_set.set_resolved_parameters(resolved_parameters)
        change_set.template_body = template_body

        # TODO: evaluate conditions
        raw_conditions = transformed_template.get("Conditions", {})
        resolved_stack_conditions = resolve_stack_conditions(
            account_id=context.account_id,
            region_name=context.region,
            conditions=raw_conditions,
            parameters=resolved_parameters,
            mappings=temp_stack.mappings,
            stack_name=stack_name,
        )
        change_set.set_resolved_stack_conditions(resolved_stack_conditions)

        deployer = template_deployer.TemplateDeployer(
            context.account_id, context.region, change_set
        )
        changes = deployer.construct_changes(
            stack,
            change_set,
            change_set_id=change_set.change_set_id,
            append_to_changeset=True,
            filter_unchanged_resources=True,
        )
        stack.change_sets.append(change_set)
        if not changes:
            change_set.metadata["Status"] = "FAILED"
            change_set.metadata["ExecutionStatus"] = "UNAVAILABLE"
            change_set.metadata["StatusReason"] = (
                "The submitted information didn't contain changes. Submit different information to create a change set."
            )
        else:
            change_set.metadata["Status"] = (
                "CREATE_COMPLETE"  # technically for some time this should first be CREATE_PENDING
            )
            change_set.metadata["ExecutionStatus"] = (
                "AVAILABLE"  # technically for some time this should first be UNAVAILABLE
            )

        return CreateChangeSetOutput(StackId=change_set.stack_id, Id=change_set.change_set_id)

    @handler("DescribeChangeSet")
    def describe_change_set(
        self,
        context: RequestContext,
        change_set_name: ChangeSetNameOrId,
        stack_name: StackNameOrId = None,
        next_token: NextToken = None,
        include_property_values: IncludePropertyValues = None,
        **kwargs,
    ) -> DescribeChangeSetOutput:
        # TODO add support for include_property_values
        # only relevant if change_set_name isn't an ARN
        if not ARN_CHANGESET_REGEX.match(change_set_name):
            if not stack_name:
                raise ValidationError(
                    "StackName must be specified if ChangeSetName is not specified as an ARN."
                )

            stack = find_stack(context.account_id, context.region, stack_name)
            if not stack:
                raise ValidationError(f"Stack [{stack_name}] does not exist")

        change_set = find_change_set(
            context.account_id, context.region, change_set_name, stack_name=stack_name
        )
        if not change_set:
            raise ChangeSetNotFoundException(f"ChangeSet [{change_set_name}] does not exist")

        attrs = [
            "ChangeSetType",
            "StackStatus",
            "LastUpdatedTime",
            "DisableRollback",
            "EnableTerminationProtection",
            "Transform",
        ]
        result = remove_attributes(deepcopy(change_set.metadata), attrs)
        # TODO: replace this patch with a better solution
        result["Parameters"] = [strip_parameter_type(p) for p in result.get("Parameters", [])]
        return result

    @handler("DeleteChangeSet")
    def delete_change_set(
        self,
        context: RequestContext,
        change_set_name: ChangeSetNameOrId,
        stack_name: StackNameOrId = None,
        **kwargs,
    ) -> DeleteChangeSetOutput:
        # only relevant if change_set_name isn't an ARN
        if not ARN_CHANGESET_REGEX.match(change_set_name):
            if not stack_name:
                raise ValidationError(
                    "StackName must be specified if ChangeSetName is not specified as an ARN."
                )

            stack = find_stack(context.account_id, context.region, stack_name)
            if not stack:
                raise ValidationError(f"Stack [{stack_name}] does not exist")

        change_set = find_change_set(
            context.account_id, context.region, change_set_name, stack_name=stack_name
        )
        if not change_set:
            raise ChangeSetNotFoundException(f"ChangeSet [{change_set_name}] does not exist")
        change_set.stack.change_sets = [
            cs
            for cs in change_set.stack.change_sets
            if change_set_name not in (cs.change_set_name, cs.change_set_id)
        ]
        return DeleteChangeSetOutput()

    @handler("ExecuteChangeSet")
    def execute_change_set(
        self,
        context: RequestContext,
        change_set_name: ChangeSetNameOrId,
        stack_name: StackNameOrId = None,
        client_request_token: ClientRequestToken = None,
        disable_rollback: DisableRollback = None,
        retain_except_on_create: RetainExceptOnCreate = None,
        **kwargs,
    ) -> ExecuteChangeSetOutput:
        change_set = find_change_set(
            context.account_id, context.region, change_set_name, stack_name=stack_name
        )
        if not change_set:
            raise ChangeSetNotFoundException(f"ChangeSet [{change_set_name}] does not exist")
        if change_set.metadata.get("ExecutionStatus") != ExecutionStatus.AVAILABLE:
            LOG.debug("Change set %s not in execution status 'AVAILABLE'", change_set_name)
            raise InvalidChangeSetStatusException(
                f"ChangeSet [{change_set.metadata['ChangeSetId']}] cannot be executed in its current status of [{change_set.metadata.get('Status')}]"
            )
        stack_name = change_set.stack.stack_name
        LOG.debug(
            'Executing change set "%s" for stack "%s" with %s resources ...',
            change_set_name,
            stack_name,
            len(change_set.template_resources),
        )
        deployer = template_deployer.TemplateDeployer(
            context.account_id, context.region, change_set.stack
        )
        try:
            deployer.apply_change_set(change_set)
            change_set.stack.metadata["ChangeSetId"] = change_set.change_set_id
        except NoStackUpdates:
            # TODO: parity-check if this exception should be re-raised or swallowed
            raise ValidationError("No updates to be performed for stack change set")

        return ExecuteChangeSetOutput()

    @handler("ListChangeSets")
    def list_change_sets(
        self,
        context: RequestContext,
        stack_name: StackNameOrId,
        next_token: NextToken = None,
        **kwargs,
    ) -> ListChangeSetsOutput:
        stack = find_stack(context.account_id, context.region, stack_name)
        if not stack:
            return not_found_error(f'Unable to find stack "{stack_name}"')
        result = [cs.metadata for cs in stack.change_sets]
        return ListChangeSetsOutput(Summaries=result)

    @handler("ListExports")
    def list_exports(
        self, context: RequestContext, next_token: NextToken = None, **kwargs
    ) -> ListExportsOutput:
        state = get_cloudformation_store(context.account_id, context.region)
        return ListExportsOutput(Exports=state.exports)

    @handler("ListImports")
    def list_imports(
        self,
        context: RequestContext,
        export_name: ExportName,
        next_token: NextToken = None,
        **kwargs,
    ) -> ListImportsOutput:
        state = get_cloudformation_store(context.account_id, context.region)

        importing_stack_names = []
        for stack in state.stacks.values():
            if export_name in stack.imports:
                importing_stack_names.append(stack.stack_name)

        return ListImportsOutput(Imports=importing_stack_names)

    @handler("DescribeStackEvents")
    def describe_stack_events(
        self,
        context: RequestContext,
        stack_name: StackName = None,
        next_token: NextToken = None,
        **kwargs,
    ) -> DescribeStackEventsOutput:
        if stack_name is None:
            raise ValidationError(
                "1 validation error detected: Value null at 'stackName' failed to satisfy constraint: Member must not be null"
            )

        stack = find_active_stack_by_name_or_id(context.account_id, context.region, stack_name)
        if not stack:
            stack = find_stack_by_id(
                account_id=context.account_id, region_name=context.region, stack_id=stack_name
            )
        if not stack:
            raise ValidationError(f"Stack [{stack_name}] does not exist")
        return DescribeStackEventsOutput(StackEvents=stack.events)

    @handler("DescribeStackResource")
    def describe_stack_resource(
        self,
        context: RequestContext,
        stack_name: StackName,
        logical_resource_id: LogicalResourceId,
        **kwargs,
    ) -> DescribeStackResourceOutput:
        stack = find_stack(context.account_id, context.region, stack_name)

        if not stack:
            return stack_not_found_error(stack_name)

        details = stack.resource_status(logical_resource_id)
        return DescribeStackResourceOutput(StackResourceDetail=details)

    @handler("DescribeStackResources")
    def describe_stack_resources(
        self,
        context: RequestContext,
        stack_name: StackName = None,
        logical_resource_id: LogicalResourceId = None,
        physical_resource_id: PhysicalResourceId = None,
        **kwargs,
    ) -> DescribeStackResourcesOutput:
        if physical_resource_id and stack_name:
            raise ValidationError("Cannot specify both StackName and PhysicalResourceId")
        # TODO: filter stack by PhysicalResourceId!
        stack = find_stack(context.account_id, context.region, stack_name)
        if not stack:
            return stack_not_found_error(stack_name)
        statuses = [
            res_status
            for res_id, res_status in stack.resource_states.items()
            if logical_resource_id in [res_id, None]
        ]
        for status in statuses:
            status.setdefault("DriftInformation", {"StackResourceDriftStatus": "NOT_CHECKED"})
        return DescribeStackResourcesOutput(StackResources=statuses)

    @handler("ListStackResources")
    def list_stack_resources(
        self, context: RequestContext, stack_name: StackName, next_token: NextToken = None, **kwargs
    ) -> ListStackResourcesOutput:
        result = self.describe_stack_resources(context, stack_name)

        resources = deepcopy(result.get("StackResources", []))
        for resource in resources:
            attrs = ["StackName", "StackId", "Timestamp", "PreviousResourceStatus"]
            remove_attributes(resource, attrs)

        return ListStackResourcesOutput(StackResourceSummaries=resources)

    @handler("ValidateTemplate", expand=False)
    def validate_template(
        self, context: RequestContext, request: ValidateTemplateInput
    ) -> ValidateTemplateOutput:
        try:
            # TODO implement actual validation logic
            template_body = api_utils.get_template_body(request)
            valid_template = json.loads(template_preparer.template_to_json(template_body))

            parameters = [
                TemplateParameter(
                    ParameterKey=k,
                    DefaultValue=v.get("Default", ""),
                    NoEcho=False,
                    Description=v.get("Description", ""),
                )
                for k, v in valid_template.get("Parameters", {}).items()
            ]

            return ValidateTemplateOutput(
                Description=valid_template.get("Description"), Parameters=parameters
            )
        except Exception as e:
            LOG.exception("Error validating template")
            raise ValidationError("Template Validation Error") from e

    # =======================================
    # =============  Stack Set  =============
    # =======================================

    @handler("CreateStackSet", expand=False)
    def create_stack_set(
        self, context: RequestContext, request: CreateStackSetInput
    ) -> CreateStackSetOutput:
        state = get_cloudformation_store(context.account_id, context.region)
        stack_set = StackSet(request)
        stack_set_id = f"{stack_set.stack_set_name}:{long_uid()}"
        stack_set.metadata["StackSetId"] = stack_set_id
        state.stack_sets[stack_set_id] = stack_set

        return CreateStackSetOutput(StackSetId=stack_set_id)

    @handler("DescribeStackSetOperation")
    def describe_stack_set_operation(
        self,
        context: RequestContext,
        stack_set_name: StackSetName,
        operation_id: ClientRequestToken,
        call_as: CallAs = None,
        **kwargs,
    ) -> DescribeStackSetOperationOutput:
        state = get_cloudformation_store(context.account_id, context.region)

        set_name = stack_set_name

        stack_set = [sset for sset in state.stack_sets.values() if sset.stack_set_name == set_name]
        if not stack_set:
            return not_found_error(f'Unable to find stack set "{set_name}"')
        stack_set = stack_set[0]
        result = stack_set.operations.get(operation_id)
        if not result:
            LOG.debug(
                'Unable to find operation ID "%s" for stack set "%s" in list: %s',
                operation_id,
                set_name,
                list(stack_set.operations.keys()),
            )
            return not_found_error(
                f'Unable to find operation ID "{operation_id}" for stack set "{set_name}"'
            )

        return DescribeStackSetOperationOutput(StackSetOperation=result)

    @handler("DescribeStackSet")
    def describe_stack_set(
        self,
        context: RequestContext,
        stack_set_name: StackSetName,
        call_as: CallAs = None,
        **kwargs,
    ) -> DescribeStackSetOutput:
        state = get_cloudformation_store(context.account_id, context.region)
        result = [
            sset.metadata
            for sset in state.stack_sets.values()
            if sset.stack_set_name == stack_set_name
        ]
        if not result:
            return not_found_error(f'Unable to find stack set "{stack_set_name}"')

        return DescribeStackSetOutput(StackSet=result[0])

    @handler("ListStackSets", expand=False)
    def list_stack_sets(
        self, context: RequestContext, request: ListStackSetsInput
    ) -> ListStackSetsOutput:
        state = get_cloudformation_store(context.account_id, context.region)
        result = [sset.metadata for sset in state.stack_sets.values()]
        return ListStackSetsOutput(Summaries=result)

    @handler("UpdateStackSet", expand=False)
    def update_stack_set(
        self, context: RequestContext, request: UpdateStackSetInput
    ) -> UpdateStackSetOutput:
        state = get_cloudformation_store(context.account_id, context.region)
        set_name = request.get("StackSetName")
        stack_set = [sset for sset in state.stack_sets.values() if sset.stack_set_name == set_name]
        if not stack_set:
            return not_found_error(f'Stack set named "{set_name}" does not exist')
        stack_set = stack_set[0]
        stack_set.metadata.update(request)
        op_id = request.get("OperationId") or short_uid()
        operation = {
            "OperationId": op_id,
            "StackSetId": stack_set.metadata["StackSetId"],
            "Action": "UPDATE",
            "Status": "SUCCEEDED",
        }
        stack_set.operations[op_id] = operation
        return UpdateStackSetOutput(OperationId=op_id)

    @handler("DeleteStackSet")
    def delete_stack_set(
        self,
        context: RequestContext,
        stack_set_name: StackSetName,
        call_as: CallAs = None,
        **kwargs,
    ) -> DeleteStackSetOutput:
        state = get_cloudformation_store(context.account_id, context.region)
        stack_set = [
            sset for sset in state.stack_sets.values() if sset.stack_set_name == stack_set_name
        ]

        if not stack_set:
            return not_found_error(f'Stack set named "{stack_set_name}" does not exist')

        # TODO: add a check for remaining stack instances

        for instance in stack_set[0].stack_instances:
            deployer = template_deployer.TemplateDeployer(
                context.account_id, context.region, instance.stack
            )
            deployer.delete_stack()
        return DeleteStackSetOutput()

    @handler("CreateStackInstances", expand=False)
    def create_stack_instances(
        self,
        context: RequestContext,
        request: CreateStackInstancesInput,
    ) -> CreateStackInstancesOutput:
        state = get_cloudformation_store(context.account_id, context.region)

        set_name = request.get("StackSetName")
        stack_set = [sset for sset in state.stack_sets.values() if sset.stack_set_name == set_name]

        if not stack_set:
            return not_found_error(f'Stack set named "{set_name}" does not exist')

        stack_set = stack_set[0]
        op_id = request.get("OperationId") or short_uid()
        sset_meta = stack_set.metadata
        accounts = request["Accounts"]
        regions = request["Regions"]

        stacks_to_await = []
        for account in accounts:
            for region in regions:
                # deploy new stack
                LOG.debug(
                    'Deploying instance for stack set "%s" in account: %s region %s',
                    set_name,
                    account,
                    region,
                )
                cf_client = connect_to(aws_access_key_id=account, region_name=region).cloudformation
                kwargs = select_attributes(sset_meta, ["TemplateBody"]) or select_attributes(
                    sset_meta, ["TemplateURL"]
                )
                stack_name = f"sset-{set_name}-{account}"

                # skip creation of existing stacks
                if find_stack(context.account_id, context.region, stack_name):
                    continue

                result = cf_client.create_stack(StackName=stack_name, **kwargs)
                stacks_to_await.append((stack_name, account, region))
                # store stack instance
                instance = {
                    "StackSetId": sset_meta["StackSetId"],
                    "OperationId": op_id,
                    "Account": account,
                    "Region": region,
                    "StackId": result["StackId"],
                    "Status": "CURRENT",
                    "StackInstanceStatus": {"DetailedStatus": "SUCCEEDED"},
                }
                instance = StackInstance(instance)
                stack_set.stack_instances.append(instance)

        # wait for completion of stack
        for stack_name, account_id, region_name in stacks_to_await:
            client = connect_to(
                aws_access_key_id=account_id, region_name=region_name
            ).cloudformation
            client.get_waiter("stack_create_complete").wait(StackName=stack_name)

        # record operation
        operation = {
            "OperationId": op_id,
            "StackSetId": stack_set.metadata["StackSetId"],
            "Action": "CREATE",
            "Status": "SUCCEEDED",
        }
        stack_set.operations[op_id] = operation

        return CreateStackInstancesOutput(OperationId=op_id)

    @handler("ListStackInstances", expand=False)
    def list_stack_instances(
        self,
        context: RequestContext,
        request: ListStackInstancesInput,
    ) -> ListStackInstancesOutput:
        set_name = request.get("StackSetName")
        state = get_cloudformation_store(context.account_id, context.region)
        stack_set = [sset for sset in state.stack_sets.values() if sset.stack_set_name == set_name]
        if not stack_set:
            return not_found_error(f'Stack set named "{set_name}" does not exist')

        stack_set = stack_set[0]
        result = [inst.metadata for inst in stack_set.stack_instances]
        return ListStackInstancesOutput(Summaries=result)

    @handler("DeleteStackInstances", expand=False)
    def delete_stack_instances(
        self,
        context: RequestContext,
        request: DeleteStackInstancesInput,
    ) -> DeleteStackInstancesOutput:
        op_id = request.get("OperationId") or short_uid()

        accounts = request["Accounts"]
        regions = request["Regions"]

        state = get_cloudformation_store(context.account_id, context.region)
        stack_sets = state.stack_sets.values()

        set_name = request.get("StackSetName")
        stack_set = next((sset for sset in stack_sets if sset.stack_set_name == set_name), None)

        if not stack_set:
            return not_found_error(f'Stack set named "{set_name}" does not exist')

        for account in accounts:
            for region in regions:
                instance = find_stack_instance(stack_set, account, region)
                if instance:
                    stack_set.stack_instances.remove(instance)

        # record operation
        operation = {
            "OperationId": op_id,
            "StackSetId": stack_set.metadata["StackSetId"],
            "Action": "DELETE",
            "Status": "SUCCEEDED",
        }
        stack_set.operations[op_id] = operation

        return DeleteStackInstancesOutput(OperationId=op_id)

    @handler("RegisterType", expand=False)
    def register_type(
        self,
        context: RequestContext,
        request: RegisterTypeInput,
    ) -> RegisterTypeOutput:
        return RegisterTypeOutput()