localstack/services/events/resource_providers/aws_events_apidestination.py
# LocalStack Resource Provider Scaffolding v2
from __future__ import annotations
from pathlib import Path
from typing import Optional, TypedDict
import localstack.services.cloudformation.provider_utils as util
from localstack.services.cloudformation.resource_provider import (
OperationStatus,
ProgressEvent,
ResourceProvider,
ResourceRequest,
)
class EventsApiDestinationProperties(TypedDict):
ConnectionArn: Optional[str]
HttpMethod: Optional[str]
InvocationEndpoint: Optional[str]
Arn: Optional[str]
Description: Optional[str]
InvocationRateLimitPerSecond: Optional[int]
Name: Optional[str]
REPEATED_INVOCATION = "repeated_invocation"
class EventsApiDestinationProvider(ResourceProvider[EventsApiDestinationProperties]):
TYPE = "AWS::Events::ApiDestination" # Autogenerated. Don't change
SCHEMA = util.get_schema_path(Path(__file__)) # Autogenerated. Don't change
def create(
self,
request: ResourceRequest[EventsApiDestinationProperties],
) -> ProgressEvent[EventsApiDestinationProperties]:
"""
Create a new resource.
Primary identifier fields:
- /properties/Name
Required properties:
- ConnectionArn
- InvocationEndpoint
- HttpMethod
Create-only properties:
- /properties/Name
Read-only properties:
- /properties/Arn
IAM permissions required:
- events:CreateApiDestination
- events:DescribeApiDestination
"""
model = request.desired_state
events = request.aws_client_factory.events
response = events.create_api_destination(**model)
model["Arn"] = response["ApiDestinationArn"]
return ProgressEvent(
status=OperationStatus.SUCCESS,
resource_model=model,
custom_context=request.custom_context,
)
def read(
self,
request: ResourceRequest[EventsApiDestinationProperties],
) -> ProgressEvent[EventsApiDestinationProperties]:
"""
Fetch resource information
IAM permissions required:
- events:DescribeApiDestination
"""
raise NotImplementedError
def delete(
self,
request: ResourceRequest[EventsApiDestinationProperties],
) -> ProgressEvent[EventsApiDestinationProperties]:
"""
Delete a resource
IAM permissions required:
- events:DeleteApiDestination
- events:DescribeApiDestination
"""
model = request.desired_state
events = request.aws_client_factory.events
events.delete_api_destination(Name=model["Name"])
return ProgressEvent(
status=OperationStatus.SUCCESS,
resource_model=model,
custom_context=request.custom_context,
)
def update(
self,
request: ResourceRequest[EventsApiDestinationProperties],
) -> ProgressEvent[EventsApiDestinationProperties]:
"""
Update a resource
IAM permissions required:
- events:UpdateApiDestination
- events:DescribeApiDestination
"""
raise NotImplementedError