Enterprise-CMCS/macpro-appian-connector

View on GitHub
src/services/connector/serverless.yml

Summary

Maintainability
Test Coverage
service: ${self:custom.project}-connector

frameworkVersion: "3"

package:
  individually: true

plugins:
  - serverless-bundle
  - serverless-plugin-scripts
  - serverless-stack-termination-protection
  - "@stratiformdigital/serverless-iam-helper"
  - "@stratiformdigital/serverless-idempotency-helper"
  - "@stratiformdigital/serverless-online"
  - "@stratiformdigital/serverless-s3-security-helper"

provider:
  name: aws
  runtime: nodejs18.x
  region: ${env:REGION_A}
  stackTags:
    PROJECT: ${self:custom.project}
    SERVICE: ${self:service}
  iam:
    role:
      path: /delegatedadmin/developer/
      permissionsBoundary: arn:aws:iam::${aws:accountId}:policy/cms-cloud-admin/developer-boundary-policy
      statements:
        - Effect: "Allow"
          Action:
            - ec2:CreateNetworkInterface
            - ec2:DeleteNetworkInterface
            - ec2:DetachNetworkInterface
            - ec2:DescribeNetworkInterfaces
            - ec2:DescribeSecurityGroups
            - ec2:DescribeSubnets
            - ec2:DescribeVpcs
          Resource: "*"
        - Effect: "Allow"
          Action:
            - ecs:ListTasks
            - ecs:DescribeTasks
          Resource: "*"
        - Effect: "Allow"
          Action:
            - cloudwatch:PutMetricData
          Resource: "*"

custom:
  project: ${env:PROJECT}
  serverlessTerminationProtection:
    stages: # Apply CloudFormation termination protection for these stages
      - master
      - val
      - production
  vpc: ${ssm:/aws/reference/secretsmanager/${self:custom.project}/${sls:stage}/vpc, ssm:/aws/reference/secretsmanager/${self:custom.project}/default/vpc}
  brokerString: ${ssm:/aws/reference/secretsmanager/${self:custom.project}/${sls:stage}/brokerString, ssm:/aws/reference/secretsmanager/${self:custom.project}/default/brokerString}
  dbInfo: ${ssm:/aws/reference/secretsmanager/${self:custom.project}/${sls:stage}/dbInfo, ssm:/aws/reference/secretsmanager/${self:custom.project}/default/dbInfo}
  connectImage: ${ssm:/aws/reference/secretsmanager/ecr/images/${self:custom.project}/${self:service}, "confluentinc/cp-kafka-connect:6.0.9"}
  scripts:
    hooks:
      deploy:finalize: |
        set -e
        call=`aws lambda invoke --region ${self:provider.region} --function-name ${self:service}-${sls:stage}-configureConnectors --invocation-type RequestResponse --log Tail /dev/stdout`
        if ! echo $call | jq '.FunctionError'; then
          echo "The Lambda function did not succeed." && exit 1;
        fi
    commands:
      connect: |
        runningTasks=(`aws --region ${self:provider.region} ecs list-tasks --cluster ${self:service}-${sls:stage}-connect --desired-status RUNNING | jq -r ".taskArns[]"`)
        echo "\nThe following command(s) may be used to exec onto running fargate tasks.  Note, if there are no commands shown, there are no runnings tasks:"
        for task in "${runningTasks[@]}"
        do
          echo """
          To forward the connector's Kafka Connect REST API to your localhost:8083, run:
          ecs-exec-pf -c ${self:service}-${sls:stage}-connect -t ${task##*/} -p 8083 -l 8083

          To conect to the connector, run:
          aws --region ${self:provider.region} ecs execute-command --cluster ${self:service}-${sls:stage}-connect --task ${task##*/} --container connect --interactive --command "/bin/sh"

          To connect to the sql instantclient, run:
          aws --region ${self:provider.region} ecs execute-command --cluster ${self:service}-${sls:stage}-connect --task ${task##*/} --container instantclient --interactive --command "/bin/sh"
          """
        done

params:
  master:
    topicNamespace: ""
  val:
    topicNamespace: ""
  production:
    topicNamespace: ""
  default:
    topicNamespace: --${self:custom.project}--${sls:stage}--

functions:
  createTopics:
    handler: handlers/createTopics.handler
    vpc:
      securityGroupIds:
        - Ref: LambdaSecurityGroup
      subnetIds: >-
        ${self:custom.vpc.privateSubnets}
    timeout: 300
  cleanupKafka:
    handler: handlers/cleanupKafka.handler
    vpc:
      securityGroupIds:
        - Ref: LambdaSecurityGroup
      subnetIds: >-
        ${self:custom.vpc.privateSubnets}
    timeout: 300
  configureConnectors:
    handler: handlers/configureConnectors.handler
    environment:
      cluster: !Ref KafkaConnectCluster
      service: !Ref KafkaConnectService
      topicNamespace: ${param:topicNamespace}
      legacydbIp: ${self:custom.dbInfo.ip}
      legacydbPort: ${self:custom.dbInfo.port}
      legacyDb: ${self:custom.dbInfo.db}
      legacydbUser: ${self:custom.dbInfo.user}
      legacydbPassword: ${self:custom.dbInfo.password}
      legacyschema: ${self:custom.dbInfo.schema}
    maximumRetryAttempts: 0
    timeout: 300
    vpc:
      securityGroupIds:
        - Ref: LambdaConfigureConnectorsSecurityGroup
      subnetIds: >-
        ${self:custom.vpc.privateSubnets}
  testConnectors:
    handler: handlers/testConnectors.handler
    environment:
      cluster: !Ref KafkaConnectCluster
      service: !Ref KafkaConnectService
      namespace: ${self:service}-${sls:stage}
    timeout: 300
    events:
      - schedule: cron(0/1 * ? * * *)
    vpc:
      securityGroupIds:
        - Ref: LambdaConfigureConnectorsSecurityGroup
      subnetIds: >-
        ${self:custom.vpc.privateSubnets}

resources:
  Conditions:
    isDev:
      Fn::Not:
        - Fn::Equals:
            - ${param:topicNamespace}
            - ""
  Resources:
    CreateTopics:
      Type: AWS::CloudFormation::CustomResource
      Properties:
        ServiceToken: !GetAtt CreateTopicsLambdaFunction.Arn
        BrokerString: ${self:custom.brokerString}
        TopicsToCreate:
          - name: ${param:topicNamespace}aws.appian.cmcs.MCP_SPA_PCKG
    CleanupTopics:
      Type: AWS::CloudFormation::CustomResource
      Condition: isDev # We only clean up topics in lower environments where isDev is true
      Properties:
        ServiceToken: !GetAtt CleanupKafkaLambdaFunction.Arn
        BrokerString: ${self:custom.brokerString}
        TopicPatternsToDelete:
          - ${param:topicNamespace}aws.appian.cmcs*
          - ${param:topicNamespace}mgmt.connect.${self:service}*
    LambdaSecurityGroup:
      Type: AWS::EC2::SecurityGroup
      Properties:
        GroupDescription: Security Group for the topics lambda function
        VpcId: ${self:custom.vpc.id}
    LambdaSecurityGroupEgress:
      Type: AWS::EC2::SecurityGroupEgress
      Properties:
        GroupId: !Ref LambdaSecurityGroup
        IpProtocol: -1
        CidrIp: 0.0.0.0/0
    KafkaConnectWorkerLogGroup:
      Type: "AWS::Logs::LogGroup"
      Properties:
        LogGroupName: /aws/fargate/${self:service}-${sls:stage}-kafka-connect
    KafkaConnectWorkerSecurityGroup:
      Type: AWS::EC2::SecurityGroup
      Properties:
        GroupDescription: Security Group for the Fargate Connect Workers.
        VpcId: ${self:custom.vpc.id}
    KafkaConnectWorkerSecurityGroupEgressLambda:
      Type: AWS::EC2::SecurityGroupEgress
      Properties:
        GroupId: !Ref KafkaConnectWorkerSecurityGroup
        IpProtocol: -1
        CidrIp: 0.0.0.0/0
    KafkaConnectWorkerSecurityGroupIngressLambda:
      Type: AWS::EC2::SecurityGroupIngress
      Properties:
        GroupId: !Sub "${KafkaConnectWorkerSecurityGroup}"
        IpProtocol: tcp
        FromPort: 8083
        ToPort: 8083
        SourceSecurityGroupId: !Sub "${LambdaConfigureConnectorsSecurityGroup}"
    KafkaConnectWorkerSecurityGroupIngressCluster:
      Type: AWS::EC2::SecurityGroupIngress
      Properties:
        GroupId: !Sub "${KafkaConnectWorkerSecurityGroup}"
        IpProtocol: tcp
        FromPort: 8083
        ToPort: 8083
        SourceSecurityGroupId: !Sub "${KafkaConnectWorkerSecurityGroup}"
    KafkaConnectWorkerRole:
      Type: "AWS::IAM::Role"
      Properties:
        AssumeRolePolicyDocument:
          Version: "2012-10-17"
          Statement:
            - Effect: "Allow"
              Principal:
                Service:
                  - "ecs.amazonaws.com"
                  - "ecs-tasks.amazonaws.com"
              Action: "sts:AssumeRole"
        ManagedPolicyArns:
          - arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy
        Policies:
          - PolicyName: "LambdaRolePolicy"
            PolicyDocument:
              Version: "2012-10-17"
              Statement:
                - Effect: Allow
                  Action:
                    - ssmmessages:CreateControlChannel
                    - ssmmessages:CreateDataChannel
                    - ssmmessages:OpenControlChannel
                    - ssmmessages:OpenDataChannel
                  Resource: "*"
                - Effect: Allow
                  Action:
                    - ecr:BatchGetImage
                  Resource: !Sub "arn:aws:ecr:${self:provider.region}:${AWS::AccountId}:repository/*"
    KafkaConnectWorkerTaskDefinition:
      Type: "AWS::ECS::TaskDefinition"
      Properties:
        ContainerDefinitions:
          - Name: connect
            Image: ${self:custom.connectImage}
            Memory: 4096
            Cpu: 2048
            User: "root"
            Command:
              - bash
              - "-c"
              - |
                export ENI_IP=`curl $ECS_CONTAINER_METADATA_URI_V4 | sed -e 's/.*IPv4Addresses":\["\(.*\)"\],"AttachmentIndex.*/\1/'` &&
                echo "$ENI_IP localhost" > /etc/hosts &&
                echo "export ENI_IP=$ENI_IP" >> /home/appuser/.bashrc
                runuser -p appuser -c '''
                  export HOME=/home/appuser &&
                  source /home/appuser/.bashrc
                  export CONNECT_REST_HOST_NAME=$ENI_IP &&
                  export CONNECT_REST_ADVERTISED_HOST_NAME=$ENI_IP &&
                  curl -L -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz &&
                  tar -xzvf confluent-hub-client-latest.tar.gz &&
                  confluent-hub install confluentinc/kafka-connect-jdbc:10.5.1 --no-prompt &&
                  curl -L -o /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/ojdbc10.jar  https://download.oracle.com/otn-pub/otn_software/jdbc/1916/ojdbc10.jar &&
                  /etc/confluent/docker/run
                '''
            Environment:
              - Name: CONNECT_BOOTSTRAP_SERVERS
                Value: >-
                  ${self:custom.brokerString}
              - Name: CONNECT_GROUP_ID
                Value: ${param:topicNamespace}mgmt.connect.${self:service}-${sls:stage}
              - Name: CONNECT_CONFIG_STORAGE_TOPIC
                Value: ${param:topicNamespace}mgmt.connect.${self:service}-${sls:stage}.config
              - Name: CONNECT_OFFSET_STORAGE_TOPIC
                Value: ${param:topicNamespace}mgmt.connect.${self:service}-${sls:stage}.offsets
              - Name: CONNECT_STATUS_STORAGE_TOPIC
                Value: ${param:topicNamespace}mgmt.connect.${self:service}-${sls:stage}.status
              - Name: CONNECT_OFFSET_STORAGE_PARTITIONS
                Value: 5
              - Name: CONNECT_STATUS_STORAGE_PARTITIONS
                Value: 1
              - Name: CONNECT_KEY_CONVERTER
                Value: org.apache.kafka.connect.json.JsonConverter
              - Name: CONNECT_VALUE_CONVERTER
                Value: org.apache.kafka.connect.json.JsonConverter
              - Name: CONNECT_INTERNAL_KEY_CONVERTER
                Value: org.apache.kafka.connect.json.JsonConverter
              - Name: CONNECT_INTERNAL_VALUE_CONVERTER
                Value: org.apache.kafka.connect.json.JsonConverter
              - Name: CONNECT_SECURITY_PROTOCOL
                Value: SSL
              # Producer/Consumer configs below
              # Thank you to https://github.com/confluentinc/kafka-connect-jdbc/issues/161
              - Name: CONNECT_PRODUCER_BOOTSTRAP_SERVERS
                Value: >-
                  ${self:custom.brokerString}
              - Name: CONNECT_PRODUCER_SECURITY_PROTOCOL
                Value: SSL
              - Name: CONNECT_CONSUMER_BOOTSTRAP_SERVERS
                Value: >-
                  ${self:custom.brokerString}
              - Name: CONNECT_CONSUMER_SECURITY_PROTOCOL
                Value: SSL
              - Name: CONNECT_PRODUCER_OFFSET_FLUSH_TIMEOUT_MS
                Value: 30000
            LogConfiguration:
              LogDriver: awslogs
              Options:
                awslogs-region: !Sub "${AWS::Region}"
                awslogs-group: !Sub "${KafkaConnectWorkerLogGroup}"
                awslogs-stream-prefix: fargate
                awslogs-datetime-format: \[%Y-%m-%d %H:%M:%S,
          - Name: instantclient
            Image: "ghcr.io/oracle/oraclelinux8-instantclient:21"
            Memory: 2048 # This will kill the instantclient container if it goes over 2048, to preserve the connector container.
            Command:
              - bash
              - "-c"
              - |
                sleep infinity
        Family: ${self:service}-${sls:stage}-kafka-connect-worker
        NetworkMode: awsvpc
        ExecutionRoleArn: !GetAtt KafkaConnectWorkerRole.Arn
        TaskRoleArn: !GetAtt KafkaConnectWorkerRole.Arn
        RequiresCompatibilities:
          - FARGATE
        Memory: 4GB
        Cpu: 2048
        Tags:
          - Key: ConditionalDependencyHack # Ensures order based on a conditioanl resource
            Value:
              Fn::If:
                - isDev
                - !Ref CleanupTopics
                - Blank
    KafkaConnectCluster:
      Type: "AWS::ECS::Cluster"
      Properties:
        ClusterName: ${self:service}-${sls:stage}-connect
        ClusterSettings:
          - Name: containerInsights
            Value: enabled
    KafkaConnectService:
      Type: "AWS::ECS::Service"
      Properties:
        Cluster: !Sub "${KafkaConnectCluster}"
        DeploymentConfiguration:
          DeploymentCircuitBreaker:
            Enable: true
            Rollback: false
          MaximumPercent: 100
          MinimumHealthyPercent: 0
        EnableExecuteCommand: true
        LaunchType: FARGATE
        ServiceName: kafka-connect
        DesiredCount: 1
        TaskDefinition: !Sub "${KafkaConnectWorkerTaskDefinition}"
        NetworkConfiguration:
          AwsvpcConfiguration:
            AssignPublicIp: DISABLED
            SecurityGroups:
              - !Sub "${KafkaConnectWorkerSecurityGroup}"
            Subnets: >-
              ${self:custom.vpc.dataSubnets}
    ECSFailureEventRule:
      Type: AWS::Events::Rule
      Properties:
        Description: "Connector Task Failure Event Rule"
        EventPattern:
          account:
            - !Sub "${AWS::AccountId}"
          source:
            - "aws.ecs"
            - "demo.cli" # used to test events from the command line
          detail-type:
            - "ECS Task State Change"
          detail:
            lastStatus:
              - "STOPPED"
            stoppedReason:
              - "Essential container in task exited"
              - "Task failed container health checks"
            clusterArn:
              - !GetAtt KafkaConnectCluster.Arn
        Targets:
          - Arn: ${param:ecsFailureTopicArn}
            Id: "ConnectorEcsTaskFailure"
            InputTransformer:
              InputPathsMap:
                "clusterArn": "$.detail.clusterArn"
                "status": "$.detail.lastStatus"
                "account": "$.account"
                "stoppedReason": "$.detail.stoppedReason"
              InputTemplate: |
                "An Connector ECS Task Failure Event has occured for appian-connectors. Account: <account> Cluster ARN: <clusterArn> Status: <status> Reason: <stoppedReason>"
    JdbcTaskAlarm:
      Type: AWS::CloudWatch::Alarm
      Properties:
        AlarmActions:
          - ${param:ecsFailureTopicArn}
        ComparisonOperator: GreaterThanOrEqualToThreshold
        DatapointsToAlarm: 2
        EvaluationPeriods: 5
        MetricName: source.jdbc.appian-dbo-1_task_failures
        Namespace: ${self:service}-${sls:stage}
        Period: 60
        Statistic: Sum
        Threshold: 1
    JdbcConnectorAlarm:
      Type: AWS::CloudWatch::Alarm
      Properties:
        AlarmActions:
          - ${param:ecsFailureTopicArn}
        ComparisonOperator: GreaterThanOrEqualToThreshold
        DatapointsToAlarm: 2
        EvaluationPeriods: 5
        MetricName: source.jdbc.appian-dbo-1_failures
        Namespace: ${self:service}-${sls:stage}
        Period: 60
        Statistic: Sum
        Threshold: 1
    LambdaConfigureConnectorsSecurityGroup:
      Type: AWS::EC2::SecurityGroup
      Properties:
        GroupDescription: Security Group for configuring the connector.
        VpcId: ${self:custom.vpc.id}
    LambdaConfigureConnectorsSecurityGroupEgress:
      Type: AWS::EC2::SecurityGroupEgress
      Properties:
        GroupId: !Ref LambdaConfigureConnectorsSecurityGroup
        IpProtocol: -1
        CidrIp: 0.0.0.0/0
    ConnectorLogsErrorCount:
      Type: AWS::Logs::MetricFilter
      Properties:
        LogGroupName:
          Ref: "KafkaConnectWorkerLogGroup"
        FilterName: ConnectorLogsErrorCount
        FilterPattern: "ERROR"
        MetricTransformations:
          - MetricValue: "1"
            DefaultValue: "0"
            MetricNamespace: ${self:service}-${sls:stage}/Connector/ERRORS
            MetricName: "ConnectorLogsErrorCount"
            Unit: Count
    ConnectorLogsErrorCountAlarm:
      Type: AWS::CloudWatch::Alarm
      Properties:
        DatapointsToAlarm: 1
        AlarmActions:
          - ${param:ecsFailureTopicArn}
        AlarmName: ${self:service}-${sls:stage}-ConnectorLogsErrorCount
        ComparisonOperator: GreaterThanOrEqualToThreshold
        EvaluationPeriods: 2
        Period: 300
        Threshold: 1
        MetricName: ConnectorLogsErrorCount
        Namespace: ${self:service}-${sls:stage}/Connector/ERRORS
        Statistic: Sum
    ConnectorLogsWarnCount:
      Type: AWS::Logs::MetricFilter
      Properties:
        LogGroupName:
          Ref: "KafkaConnectWorkerLogGroup"
        FilterName: ConnectorLogsWarnCount
        FilterPattern: "WARN"
        MetricTransformations:
          - MetricValue: "1"
            DefaultValue: "0"
            MetricNamespace: ${self:service}-${sls:stage}/Connector/WARNS
            MetricName: "ConnectorLogsWarnCount"
            Unit: Count
    ConnectorLogsWarnCountAlarm:
      Type: AWS::CloudWatch::Alarm
      Properties:
        DatapointsToAlarm: 1
        AlarmActions:
          - ${param:ecsFailureTopicArn}
        AlarmName: ${self:service}-${sls:stage}-ConnectorLogsWarnCount
        ComparisonOperator: GreaterThanOrEqualToThreshold
        EvaluationPeriods: 2
        Period: 300
        Threshold: 3
        MetricName: ConnectorLogsWarnCount
        Namespace: ${self:service}-${sls:stage}/Connector/WARNS
        Statistic: Sum
    KafkaConnectServiceECSCpuAlarm:
      Type: AWS::CloudWatch::Alarm
      Properties:
        AlarmName: ${self:service}-${sls:stage}-KafkaConnectService-CPUUtilization
        AlarmDescription: Trigger an alarm when the CPU utilization reaches 75%
        Namespace: AWS/ECS
        MetricName: CPUUtilization
        Dimensions:
          - Name: ClusterName
            Value: !Ref KafkaConnectCluster
          - Name: ServiceName
            Value: !GetAtt KafkaConnectService.Name
        Statistic: Average
        Period: 60
        EvaluationPeriods: 2
        Threshold: 75
        ComparisonOperator: GreaterThanOrEqualToThreshold
        AlarmActions:
          - ${param:ecsFailureTopicArn}
        OKActions:
          - $${param:ecsFailureTopicArn}
    KafkaConnectServiceECSMemoryAlarm:
      Type: AWS::CloudWatch::Alarm
      Properties:
        AlarmName: ${self:service}-${sls:stage}-KafkaConnectService-MemoryUtilization
        AlarmDescription: Trigger an alarm when the Memory utilization reaches 75%
        Namespace: AWS/ECS
        MetricName: MemoryUtilization
        Dimensions:
          - Name: ClusterName
            Value: !Ref KafkaConnectCluster
          - Name: ServiceName
            Value: !GetAtt KafkaConnectService.Name
        Statistic: Average
        Period: 60
        EvaluationPeriods: 2
        Threshold: 75
        ComparisonOperator: GreaterThanOrEqualToThreshold
        AlarmActions:
          - ${param:ecsFailureTopicArn}
        OKActions:
          - ${param:ecsFailureTopicArn}

  Outputs:
    KafkaConnectWorkerSecurityGroupId:
      Description: |
        The ID of the security group attached to the Kafka Connect cluster tasks.
        This can be used by other resources to attach additional ingress rules.
      Value: !Ref KafkaConnectWorkerSecurityGroup