atlassian/localstack

View on GitHub
localstack/utils/cloudformation/template_deployer.py

Summary

Maintainability
F
3 days
Test Coverage
import json
import yaml
import logging
import traceback
from threading import local
from six import iteritems
from six import string_types
from localstack.utils import common
from localstack.utils.aws import aws_stack
from localstack.constants import DEFAULT_REGION

ACTION_CREATE = 'create'
PLACEHOLDER_RESOURCE_NAME = '__resource_name__'

# flag to indicate whether we are currently in the process of deployment
MARKER_DONT_REDEPLOY_STACK = 'markerToIndicateNotToRedeployStack'

LOGGER = logging.getLogger(__name__)

RESOURCE_TO_FUNCTION = {
    'S3::Bucket': {
        'create': {
            'boto_client': 'resource',
            'function': 'create_bucket',
            'parameters': {
                'Bucket': ['BucketName', PLACEHOLDER_RESOURCE_NAME],
                'ACL': 'AccessControl'
            }
        }
    },
    'SQS::Queue': {
        'create': {
            'boto_client': 'resource',
            'function': 'create_queue',
            'parameters': {
                'QueueName': 'QueueName'
            }
        }
    },
    'Logs::LogGroup': {
        # TODO implement
    },
    'Lambda::Function': {
        'create': {
            'boto_client': 'client',
            'function': 'create_function',
            'parameters': {
                'FunctionName': 'FunctionName',
                'Runtime': 'Runtime',
                'Role': 'Role',
                'Handler': 'Handler',
                'Code': 'Code',
                'Description': 'Description'
                # TODO add missing fields
            },
            'defaults': {
                'Role': 'test_role'
            }
        }
    },
    'Lambda::Version': {},
    'Lambda::Permission': {},
    'Lambda::EventSourceMapping': {
        'create': {
            'boto_client': 'client',
            'function': 'create_event_source_mapping',
            'parameters': {
                'FunctionName': 'FunctionName',
                'EventSourceArn': 'EventSourceArn',
                'StartingPosition': 'StartingPosition',
                'Enabled': 'Enabled',
                'BatchSize': 'BatchSize',
                'StartingPositionTimestamp': 'StartingPositionTimestamp'
            }
        }
    },
    'DynamoDB::Table': {
        'create': {
            'boto_client': 'client',
            'function': 'create_table',
            'parameters': {
                'TableName': 'TableName',
                'AttributeDefinitions': 'AttributeDefinitions',
                'KeySchema': 'KeySchema',
                'ProvisionedThroughput': 'ProvisionedThroughput',
                'LocalSecondaryIndexes': 'LocalSecondaryIndexes',
                'GlobalSecondaryIndexes': 'GlobalSecondaryIndexes',
                'StreamSpecification': 'StreamSpecification'
            },
            'defaults': {
                'ProvisionedThroughput': {
                    'ReadCapacityUnits': 5,
                    'WriteCapacityUnits': 5
                }
            }
        }
    },
    'IAM::Role': {
        # TODO implement
    },
    'ApiGateway::RestApi': {
        'create': {
            'boto_client': 'client',
            'function': 'create_rest_api',
            'parameters': {
                'name': 'Name',
                'description': 'Description'
            }
        }
    },
    'ApiGateway::Resource': {
        'create': {
            'boto_client': 'client',
            'function': 'create_resource',
            'parameters': {
                'restApiId': 'RestApiId',
                'pathPart': 'PathPart',
                'parentId': 'ParentId'
            }
        }
    },
    'ApiGateway::Method': {
        'create': {
            'boto_client': 'client',
            'function': 'put_method',
            'parameters': {
                'restApiId': 'RestApiId',
                'resourceId': 'ResourceId',
                'httpMethod': 'HttpMethod',
                'authorizationType': 'AuthorizationType',
                'requestParameters': 'RequestParameters'
            }
        }
    },
    'ApiGateway::Method::Integration': {
    },
    'ApiGateway::Deployment': {
        'create': {
            'boto_client': 'client',
            'function': 'create_deployment',
            'parameters': {
                'restApiId': 'RestApiId',
                'stageName': 'StageName',
                'stageDescription': 'StageDescription',
                'description': 'Description'
            }
        }
    }
}


def parse_template(template):
    try:
        return json.loads(template)
    except Exception as e:
        return yaml.load(template)


def template_to_json(template):
    template = parse_template(template)
    return json.dumps(template)


def get_resource_type(resource):
    return resource['Type'].split('::', 1)[1]


def get_service_name(resource):
    return resource['Type'].split('::')[1].lower()


def get_client(resource):
    resource_type = get_resource_type(resource)
    service = get_service_name(resource)
    resource_config = RESOURCE_TO_FUNCTION.get(resource_type)
    if resource_config is None:
        raise Exception('CloudFormation deployment for resource type %s not yet implemented' % resource_type)
    if ACTION_CREATE not in resource_config:
        # nothing to do for this resource
        return
    try:
        if resource_config[ACTION_CREATE].get('boto_client') == 'resource':
            return aws_stack.connect_to_resource(service)
        return aws_stack.connect_to_service(service)
    except Exception as e:
        LOGGER.warning('Unable to get client for "%s" API, skipping deployment.' % service)
        return None


def describe_stack_resources(stack_name, logical_resource_id):
    client = aws_stack.connect_to_service('cloudformation')
    resources = client.describe_stack_resources(StackName=stack_name, LogicalResourceId=logical_resource_id)
    result = []
    for res in resources['StackResources']:
        if res.get('LogicalResourceId') == logical_resource_id:
            result.append(res)
    return result


def retrieve_resource_details(resource_id, resource_status, resources, stack_name):
    resource = resources[resource_id]
    resource_id = resource_status.get('PhysicalResourceId') or resource_id
    resource_type = resource_status['ResourceType']
    if not resource:
        resource = {}
    resource_props = resource.get('Properties')
    try:
        if resource_type == 'AWS::Lambda::Function':
            resource_id = resource_props['FunctionName'] if resource else resource_id
            return aws_stack.connect_to_service('lambda').get_function(FunctionName=resource_id)
        if resource_type == 'AWS::Lambda::EventSourceMapping':
            resource_id = resource_props['FunctionName'] if resource else resource_id
            source_arn = resource_props.get('EventSourceArn')
            resource_id = resolve_refs_recursively(stack_name, resource_id, resources)
            source_arn = resolve_refs_recursively(stack_name, source_arn, resources)
            if not resource_id or not source_arn:
                raise Exception('ResourceNotFound')
            mappings = aws_stack.connect_to_service('lambda').list_event_source_mappings(
                FunctionName=resource_id, EventSourceArn=source_arn)
            mapping = list(filter(lambda m:
                m['EventSourceArn'] == source_arn and m['FunctionArn'] == aws_stack.lambda_function_arn(resource_id),
                mappings['EventSourceMappings']))
            if not mapping:
                raise Exception('ResourceNotFound')
            return mapping[0]
        if resource_type == 'AWS::DynamoDB::Table':
            return aws_stack.connect_to_service('dynamodb').describe_table(TableName=resource_id)
        if resource_type == 'AWS::ApiGateway::RestApi':
            apis = aws_stack.connect_to_service('apigateway').get_rest_apis()['items']
            resource_id = resource_props['Name'] if resource else resource_id
            result = list(filter(lambda api: api['name'] == resource_id, apis))
            return result[0] if result else None
        if resource_type == 'AWS::ApiGateway::Resource':
            api_id = resource_props['RestApiId'] if resource else resource_id
            api_id = resolve_refs_recursively(stack_name, api_id, resources)
            parent_id = resolve_refs_recursively(stack_name, resource_props['ParentId'], resources)
            if not api_id or not parent_id:
                return None
            api_resources = aws_stack.connect_to_service('apigateway').get_resources(restApiId=api_id)['items']
            target_resource = list(filter(lambda res:
                res.get('parentId') == parent_id and res['pathPart'] == resource_props['PathPart'], api_resources))
            if not target_resource:
                return None
            path = aws_stack.get_apigateway_path_for_resource(api_id,
                target_resource[0]['id'], resources=api_resources)
            result = list(filter(lambda res: res['path'] == path, api_resources))
            return result[0] if result else None
        if resource_type == 'AWS::ApiGateway::Deployment':
            api_id = resource_props['RestApiId'] if resource else resource_id
            api_id = resolve_refs_recursively(stack_name, api_id, resources)
            if not api_id:
                return None
            result = aws_stack.connect_to_service('apigateway').get_deployments(restApiId=api_id)['items']
            # TODO possibly filter results by stage name or other criteria
            return result[0] if result else None
        if resource_type == 'AWS::ApiGateway::Method':
            api_id = resolve_refs_recursively(stack_name, resource_props['RestApiId'], resources)
            res_id = resolve_refs_recursively(stack_name, resource_props['ResourceId'], resources)
            if not api_id or not res_id:
                return None
            return aws_stack.connect_to_service('apigateway').get_method(restApiId=api_id,
                resourceId=res_id, httpMethod=resource_props['HttpMethod'])
        if resource_type == 'AWS::S3::Bucket':
            return aws_stack.connect_to_service('s3').get_bucket_location(Bucket=resource_id)
        if resource_type == 'AWS::Logs::LogGroup':
            # TODO implement
            raise Exception('ResourceNotFound')
        if is_deployable_resource(resource):
            LOGGER.warning('Unexpected resource type %s when resolving references' % resource_type)
    except Exception as e:
        # we expect this to be a "not found" exception
        markers = ['NoSuchBucket', 'ResourceNotFound', '404']
        if not list(filter(lambda marker: marker in str(e), markers)):
            LOGGER.warning('Unexpected error retrieving details for resource %s: %s %s - %s %s' %
                (resource_type, e, traceback.format_exc(), resource, resource_status))
    return None


def extract_resource_attribute(resource_type, resource, attribute):
    LOGGER.debug('Extract resource attribute: %s %s' % (resource_type, attribute))
    # extract resource specific attributes
    if resource_type == 'Lambda::Function':
        actual_attribute = 'FunctionArn' if attribute == 'Arn' else attribute
        return resource['Configuration'][actual_attribute]
    elif resource_type == 'DynamoDB::Table':
        actual_attribute = 'LatestStreamArn' if attribute == 'StreamArn' else attribute
        value = resource['Table'].get(actual_attribute)
        return value
    elif resource_type == 'ApiGateway::RestApi':
        if attribute == 'PhysicalResourceId':
            return resource['id']
        if attribute == 'RootResourceId':
            resources = aws_stack.connect_to_service('apigateway').get_resources(restApiId=resource['id'])['items']
            for res in resources:
                if res['path'] == '/' and not res.get('parentId'):
                    return res['id']
    elif resource_type == 'ApiGateway::Resource':
        if attribute == 'PhysicalResourceId':
            return resource['id']
    result = resource.get(attribute)


def resolve_ref(stack_name, ref, resources, attribute):
    LOGGER.debug('Resolving ref %s - %s' % (ref, attribute))
    if ref == 'AWS::Region':
        return DEFAULT_REGION
    client = aws_stack.connect_to_service('cloudformation')
    resource_status = describe_stack_resources(stack_name, ref)[0]
    attr_value = resource_status.get(attribute)
    if attr_value not in [None, '']:
        return attr_value
    # fetch resource details
    resource = resources.get(ref)
    resource_new = retrieve_resource_details(ref, resource_status, resources, stack_name)
    if not resource_new:
        return
    resource_type = get_resource_type(resource)
    result = extract_resource_attribute(resource_type, resource_new, attribute)
    if not result:
        LOGGER.warning('Unable to extract reference attribute %s from resource: %s' % (attribute, resource_new))
    return result


def resolve_refs_recursively(stack_name, value, resources):
    if isinstance(value, dict):
        if len(value) == 1 and 'Ref' in value:
            return resolve_ref(stack_name, value['Ref'],
                resources, attribute='PhysicalResourceId')
        elif len(value) == 1 and 'Fn::GetAtt' in value:
            return resolve_ref(stack_name, value['Fn::GetAtt'][0],
                resources, attribute=value['Fn::GetAtt'][1])
        else:
            for key, val in iteritems(value):
                value[key] = resolve_refs_recursively(stack_name, val, resources)
        if len(value) == 1 and 'Fn::Join' in value:
            return value['Fn::Join'][0].join(value['Fn::Join'][1])
    if isinstance(value, list):
        for i in range(0, len(value)):
            value[i] = resolve_refs_recursively(stack_name, value[i], resources)
    return value


def set_status_deployed(resource_id, resource, stack_name):
    client = aws_stack.connect_to_service('cloudformation')
    template = {
        # TODO update deployment status
        MARKER_DONT_REDEPLOY_STACK: {}
    }
    # TODO: instead of calling update_stack, introduce a backdoor API method to
    # update the deployment status of individual resources. The problem with
    # using the code below is that it sets the status to UPDATE_COMPLETE which may
    # be undesirable (if the stack has just been created we expect CREATE_COMPLETE).
    # client.update_stack(StackName=stack_name, TemplateBody=json.dumps(template), UsePreviousTemplate=True)


def deploy_resource(resource_id, resources, stack_name):
    resource = resources[resource_id]
    client = get_client(resource)
    if not client:
        return False
    resource_type = get_resource_type(resource)
    func_details = RESOURCE_TO_FUNCTION.get(resource_type)
    if not func_details:
        LOGGER.warning('Resource type not yet implemented: %s' % resource['Type'])
        return
    LOGGER.debug('Deploying resource type "%s" id "%s"' % (resource_type, resource_id))
    func_details = func_details[ACTION_CREATE]
    function = getattr(client, func_details['function'])
    params = dict(func_details['parameters'])
    defaults = func_details.get('defaults', {})
    if 'Properties' not in resource:
        resource['Properties'] = {}
    resource_props = resource['Properties']
    for param_key, prop_keys in iteritems(dict(params)):
        params.pop(param_key, None)
        if not isinstance(prop_keys, list):
            prop_keys = [prop_keys]
        for prop_key in prop_keys:
            if prop_key == PLACEHOLDER_RESOURCE_NAME:
                # obtain physical resource name from stack resources
                params[param_key] = resolve_ref(stack_name, resource_id, resources,
                    attribute='PhysicalResourceId')
            else:
                prop_value = resource_props.get(prop_key)
                if prop_value is not None:
                    params[param_key] = prop_value
            tmp_value = params.get(param_key)
            if tmp_value is not None:
                params[param_key] = resolve_refs_recursively(stack_name, tmp_value, resources)
                break
        # hack: convert to boolean
        if params.get(param_key) in ['True', 'False']:
            params[param_key] = params.get(param_key) == 'True'
    # assign default value if empty
    params = common.merge_recursive(defaults, params)
    # invoke function
    try:
        result = function(**params)
    except Exception as e:
        LOGGER.warning('Error calling %s with params: %s for resource: %s' % (function, params, resource))
        raise e
    # some resources have attached/nested resources which we need to create recursively now
    if resource_type == 'ApiGateway::Method':
        integration = resource_props.get('Integration')
        if integration:
            api_id = resolve_refs_recursively(stack_name, resource_props['RestApiId'], resources)
            res_id = resolve_refs_recursively(stack_name, resource_props['ResourceId'], resources)
            uri = integration.get('Uri')
            if uri:
                uri = resolve_refs_recursively(stack_name, uri, resources)
                aws_stack.connect_to_service('apigateway').put_integration(restApiId=api_id, resourceId=res_id,
                    httpMethod=resource_props['HttpMethod'], type=integration['Type'],
                    integrationHttpMethod=integration['IntegrationHttpMethod'], uri=uri
                )
    # update status
    set_status_deployed(resource_id, resource, stack_name)
    return result


def deploy_template(template, stack_name):
    if isinstance(template, string_types):
        template = parse_template(template)

    if MARKER_DONT_REDEPLOY_STACK in template:
        # If we are currently deploying, then bail. This can occur if
        # deploy_template(..) method calls boto's update_stack(..) (to update the
        # state of resources) which itself triggers another call to deploy_template(..).
        # We don't want to end up in an infinite/recursive deployment loop.
        return

    resource_map = template.get('Resources')
    if not resource_map:
        LOGGER.warning('CloudFormation template contains no Resources section')
        return

    next = resource_map

    iters = 10
    for i in range(0, iters):

        # get resource details
        for resource_id, resource in iteritems(next):
            stack_resources = describe_stack_resources(stack_name, resource_id)
            resource['__details__'] = stack_resources[0]

        next = resources_to_deploy_next(resource_map, stack_name)
        if not next:
            return

        for resource_id, resource in iteritems(next):
            deploy_resource(resource_id, resource_map, stack_name=stack_name)

    LOGGER.warning('Unable to resolve all dependencies and deploy all resources ' +
        'after %s iterations. Remaining (%s): %s' % (iters, len(next), next))


# --------
# Util methods for analyzing resource dependencies
# --------

def is_deployable_resource(resource):
    resource_type = get_resource_type(resource)
    entry = RESOURCE_TO_FUNCTION.get(resource_type)
    if entry is None:
        LOGGER.warning('Unknown resource type "%s"' % resource_type)
    return entry and entry.get(ACTION_CREATE)


def is_deployed(resource_id, resources, stack_name):
    resource = resources[resource_id]
    resource_status = resource['__details__']
    details = retrieve_resource_details(resource_id, resource_status, resources, stack_name)
    return bool(details)


def all_dependencies_satisfied(resources, stack_name, all_resources, depending_resource=None):
    for resource_id, resource in iteritems(resources):
        if is_deployable_resource(resource):
            if not is_deployed(resource_id, all_resources, stack_name):
                return False
    return True


def resources_to_deploy_next(resources, stack_name):
    result = {}
    for resource_id, resource in iteritems(resources):
        if is_deployable_resource(resource) and not is_deployed(resource_id, resources, stack_name):
            res_deps = get_resource_dependencies(resource_id, resource, resources)
            if all_dependencies_satisfied(res_deps, stack_name, resources, resource_id):
                result[resource_id] = resource
    return result


def get_resource_dependencies(resource_id, resource, resources):
    result = {}
    dumped = json.dumps(common.json_safe(resource))
    for other_id, other in iteritems(resources):
        if resource != other:
            # TODO: traverse dict instead of doing string search
            search1 = '{"Ref": "%s"}' % other_id
            search2 = '{"Fn::GetAtt": ["%s", ' % other_id
            if search1 in dumped or search2 in dumped:
                result[other_id] = other
            if other_id in resource.get('DependsOn', []):
                result[other_id] = other
    return result