localstack/localstack

View on GitHub
localstack/services/events/resource_providers/aws_events_apidestination.py

Summary

Maintainability
A
0 mins
Test Coverage
# LocalStack Resource Provider Scaffolding v2
from __future__ import annotations

from pathlib import Path
from typing import Optional, TypedDict

import localstack.services.cloudformation.provider_utils as util
from localstack.services.cloudformation.resource_provider import (
    OperationStatus,
    ProgressEvent,
    ResourceProvider,
    ResourceRequest,
)


class EventsApiDestinationProperties(TypedDict):
    ConnectionArn: Optional[str]
    HttpMethod: Optional[str]
    InvocationEndpoint: Optional[str]
    Arn: Optional[str]
    Description: Optional[str]
    InvocationRateLimitPerSecond: Optional[int]
    Name: Optional[str]


REPEATED_INVOCATION = "repeated_invocation"


class EventsApiDestinationProvider(ResourceProvider[EventsApiDestinationProperties]):
    TYPE = "AWS::Events::ApiDestination"  # Autogenerated. Don't change
    SCHEMA = util.get_schema_path(Path(__file__))  # Autogenerated. Don't change

    def create(
        self,
        request: ResourceRequest[EventsApiDestinationProperties],
    ) -> ProgressEvent[EventsApiDestinationProperties]:
        """
        Create a new resource.

        Primary identifier fields:
          - /properties/Name

        Required properties:
          - ConnectionArn
          - InvocationEndpoint
          - HttpMethod

        Create-only properties:
          - /properties/Name

        Read-only properties:
          - /properties/Arn

        IAM permissions required:
          - events:CreateApiDestination
          - events:DescribeApiDestination

        """
        model = request.desired_state
        events = request.aws_client_factory.events

        response = events.create_api_destination(**model)
        model["Arn"] = response["ApiDestinationArn"]

        return ProgressEvent(
            status=OperationStatus.SUCCESS,
            resource_model=model,
            custom_context=request.custom_context,
        )

    def read(
        self,
        request: ResourceRequest[EventsApiDestinationProperties],
    ) -> ProgressEvent[EventsApiDestinationProperties]:
        """
        Fetch resource information

        IAM permissions required:
          - events:DescribeApiDestination
        """
        raise NotImplementedError

    def delete(
        self,
        request: ResourceRequest[EventsApiDestinationProperties],
    ) -> ProgressEvent[EventsApiDestinationProperties]:
        """
        Delete a resource

        IAM permissions required:
          - events:DeleteApiDestination
          - events:DescribeApiDestination
        """
        model = request.desired_state
        events = request.aws_client_factory.events

        events.delete_api_destination(Name=model["Name"])
        return ProgressEvent(
            status=OperationStatus.SUCCESS,
            resource_model=model,
            custom_context=request.custom_context,
        )

    def update(
        self,
        request: ResourceRequest[EventsApiDestinationProperties],
    ) -> ProgressEvent[EventsApiDestinationProperties]:
        """
        Update a resource

        IAM permissions required:
          - events:UpdateApiDestination
          - events:DescribeApiDestination
        """
        raise NotImplementedError