src/services/connector/serverless.yml
service: ${self:custom.project}-connector
frameworkVersion: "3"
package:
individually: true
plugins:
- serverless-bundle
- serverless-plugin-scripts
- serverless-stack-termination-protection
- "@stratiformdigital/serverless-iam-helper"
- "@stratiformdigital/serverless-idempotency-helper"
- "@stratiformdigital/serverless-online"
- "@stratiformdigital/serverless-s3-security-helper"
provider:
name: aws
runtime: nodejs18.x
region: ${env:REGION_A}
stackTags:
PROJECT: ${self:custom.project}
SERVICE: ${self:service}
iam:
role:
path: /delegatedadmin/developer/
permissionsBoundary: arn:aws:iam::${aws:accountId}:policy/cms-cloud-admin/developer-boundary-policy
statements:
- Effect: "Allow"
Action:
- ec2:CreateNetworkInterface
- ec2:DeleteNetworkInterface
- ec2:DetachNetworkInterface
- ec2:DescribeNetworkInterfaces
- ec2:DescribeSecurityGroups
- ec2:DescribeSubnets
- ec2:DescribeVpcs
Resource: "*"
- Effect: "Allow"
Action:
- ecs:ListTasks
- ecs:DescribeTasks
Resource: "*"
- Effect: "Allow"
Action:
- cloudwatch:PutMetricData
Resource: "*"
custom:
project: ${env:PROJECT}
serverlessTerminationProtection:
stages: # Apply CloudFormation termination protection for these stages
- master
- val
- production
vpc: ${ssm:/aws/reference/secretsmanager/${self:custom.project}/${sls:stage}/vpc, ssm:/aws/reference/secretsmanager/${self:custom.project}/default/vpc}
brokerString: ${ssm:/aws/reference/secretsmanager/${self:custom.project}/${sls:stage}/brokerString, ssm:/aws/reference/secretsmanager/${self:custom.project}/default/brokerString}
dbInfo: ${ssm:/aws/reference/secretsmanager/${self:custom.project}/${sls:stage}/dbInfo, ssm:/aws/reference/secretsmanager/${self:custom.project}/default/dbInfo}
connectImage: ${ssm:/aws/reference/secretsmanager/ecr/images/${self:custom.project}/${self:service}, "confluentinc/cp-kafka-connect:6.0.9"}
scripts:
hooks:
deploy:finalize: |
set -e
call=`aws lambda invoke --region ${self:provider.region} --function-name ${self:service}-${sls:stage}-configureConnectors --invocation-type RequestResponse --log Tail /dev/stdout`
if ! echo $call | jq '.FunctionError'; then
echo "The Lambda function did not succeed." && exit 1;
fi
commands:
connect: |
runningTasks=(`aws --region ${self:provider.region} ecs list-tasks --cluster ${self:service}-${sls:stage}-connect --desired-status RUNNING | jq -r ".taskArns[]"`)
echo "\nThe following command(s) may be used to exec onto running fargate tasks. Note, if there are no commands shown, there are no runnings tasks:"
for task in "${runningTasks[@]}"
do
echo """
To forward the connector's Kafka Connect REST API to your localhost:8083, run:
ecs-exec-pf -c ${self:service}-${sls:stage}-connect -t ${task##*/} -p 8083 -l 8083
To conect to the connector, run:
aws --region ${self:provider.region} ecs execute-command --cluster ${self:service}-${sls:stage}-connect --task ${task##*/} --container connect --interactive --command "/bin/sh"
To connect to the sql instantclient, run:
aws --region ${self:provider.region} ecs execute-command --cluster ${self:service}-${sls:stage}-connect --task ${task##*/} --container instantclient --interactive --command "/bin/sh"
"""
done
params:
master:
topicNamespace: ""
val:
topicNamespace: ""
production:
topicNamespace: ""
default:
topicNamespace: --${self:custom.project}--${sls:stage}--
functions:
createTopics:
handler: handlers/createTopics.handler
vpc:
securityGroupIds:
- Ref: LambdaSecurityGroup
subnetIds: >-
${self:custom.vpc.privateSubnets}
timeout: 300
cleanupKafka:
handler: handlers/cleanupKafka.handler
vpc:
securityGroupIds:
- Ref: LambdaSecurityGroup
subnetIds: >-
${self:custom.vpc.privateSubnets}
timeout: 300
configureConnectors:
handler: handlers/configureConnectors.handler
environment:
cluster: !Ref KafkaConnectCluster
service: !Ref KafkaConnectService
topicNamespace: ${param:topicNamespace}
legacydbIp: ${self:custom.dbInfo.ip}
legacydbPort: ${self:custom.dbInfo.port}
legacyDb: ${self:custom.dbInfo.db}
legacydbUser: ${self:custom.dbInfo.user}
legacydbPassword: ${self:custom.dbInfo.password}
legacyschema: ${self:custom.dbInfo.schema}
maximumRetryAttempts: 0
timeout: 300
vpc:
securityGroupIds:
- Ref: LambdaConfigureConnectorsSecurityGroup
subnetIds: >-
${self:custom.vpc.privateSubnets}
testConnectors:
handler: handlers/testConnectors.handler
environment:
cluster: !Ref KafkaConnectCluster
service: !Ref KafkaConnectService
namespace: ${self:service}-${sls:stage}
timeout: 300
events:
- schedule: cron(0/1 * ? * * *)
vpc:
securityGroupIds:
- Ref: LambdaConfigureConnectorsSecurityGroup
subnetIds: >-
${self:custom.vpc.privateSubnets}
resources:
Conditions:
isDev:
Fn::Not:
- Fn::Equals:
- ${param:topicNamespace}
- ""
Resources:
CreateTopics:
Type: AWS::CloudFormation::CustomResource
Properties:
ServiceToken: !GetAtt CreateTopicsLambdaFunction.Arn
BrokerString: ${self:custom.brokerString}
TopicsToCreate:
- name: ${param:topicNamespace}aws.appian.cmcs.MCP_SPA_PCKG
CleanupTopics:
Type: AWS::CloudFormation::CustomResource
Condition: isDev # We only clean up topics in lower environments where isDev is true
Properties:
ServiceToken: !GetAtt CleanupKafkaLambdaFunction.Arn
BrokerString: ${self:custom.brokerString}
TopicPatternsToDelete:
- ${param:topicNamespace}aws.appian.cmcs*
- ${param:topicNamespace}mgmt.connect.${self:service}*
LambdaSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security Group for the topics lambda function
VpcId: ${self:custom.vpc.id}
LambdaSecurityGroupEgress:
Type: AWS::EC2::SecurityGroupEgress
Properties:
GroupId: !Ref LambdaSecurityGroup
IpProtocol: -1
CidrIp: 0.0.0.0/0
KafkaConnectWorkerLogGroup:
Type: "AWS::Logs::LogGroup"
Properties:
LogGroupName: /aws/fargate/${self:service}-${sls:stage}-kafka-connect
KafkaConnectWorkerSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security Group for the Fargate Connect Workers.
VpcId: ${self:custom.vpc.id}
KafkaConnectWorkerSecurityGroupEgressLambda:
Type: AWS::EC2::SecurityGroupEgress
Properties:
GroupId: !Ref KafkaConnectWorkerSecurityGroup
IpProtocol: -1
CidrIp: 0.0.0.0/0
KafkaConnectWorkerSecurityGroupIngressLambda:
Type: AWS::EC2::SecurityGroupIngress
Properties:
GroupId: !Sub "${KafkaConnectWorkerSecurityGroup}"
IpProtocol: tcp
FromPort: 8083
ToPort: 8083
SourceSecurityGroupId: !Sub "${LambdaConfigureConnectorsSecurityGroup}"
KafkaConnectWorkerSecurityGroupIngressCluster:
Type: AWS::EC2::SecurityGroupIngress
Properties:
GroupId: !Sub "${KafkaConnectWorkerSecurityGroup}"
IpProtocol: tcp
FromPort: 8083
ToPort: 8083
SourceSecurityGroupId: !Sub "${KafkaConnectWorkerSecurityGroup}"
KafkaConnectWorkerRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- "ecs.amazonaws.com"
- "ecs-tasks.amazonaws.com"
Action: "sts:AssumeRole"
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy
Policies:
- PolicyName: "LambdaRolePolicy"
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- ssmmessages:CreateControlChannel
- ssmmessages:CreateDataChannel
- ssmmessages:OpenControlChannel
- ssmmessages:OpenDataChannel
Resource: "*"
- Effect: Allow
Action:
- ecr:BatchGetImage
Resource: !Sub "arn:aws:ecr:${self:provider.region}:${AWS::AccountId}:repository/*"
KafkaConnectWorkerTaskDefinition:
Type: "AWS::ECS::TaskDefinition"
Properties:
ContainerDefinitions:
- Name: connect
Image: ${self:custom.connectImage}
Memory: 4096
Cpu: 2048
User: "root"
Command:
- bash
- "-c"
- |
export ENI_IP=`curl $ECS_CONTAINER_METADATA_URI_V4 | sed -e 's/.*IPv4Addresses":\["\(.*\)"\],"AttachmentIndex.*/\1/'` &&
echo "$ENI_IP localhost" > /etc/hosts &&
echo "export ENI_IP=$ENI_IP" >> /home/appuser/.bashrc
runuser -p appuser -c '''
export HOME=/home/appuser &&
source /home/appuser/.bashrc
export CONNECT_REST_HOST_NAME=$ENI_IP &&
export CONNECT_REST_ADVERTISED_HOST_NAME=$ENI_IP &&
curl -L -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz &&
tar -xzvf confluent-hub-client-latest.tar.gz &&
confluent-hub install confluentinc/kafka-connect-jdbc:10.5.1 --no-prompt &&
curl -L -o /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/ojdbc10.jar https://download.oracle.com/otn-pub/otn_software/jdbc/1916/ojdbc10.jar &&
/etc/confluent/docker/run
'''
Environment:
- Name: CONNECT_BOOTSTRAP_SERVERS
Value: >-
${self:custom.brokerString}
- Name: CONNECT_GROUP_ID
Value: ${param:topicNamespace}mgmt.connect.${self:service}-${sls:stage}
- Name: CONNECT_CONFIG_STORAGE_TOPIC
Value: ${param:topicNamespace}mgmt.connect.${self:service}-${sls:stage}.config
- Name: CONNECT_OFFSET_STORAGE_TOPIC
Value: ${param:topicNamespace}mgmt.connect.${self:service}-${sls:stage}.offsets
- Name: CONNECT_STATUS_STORAGE_TOPIC
Value: ${param:topicNamespace}mgmt.connect.${self:service}-${sls:stage}.status
- Name: CONNECT_OFFSET_STORAGE_PARTITIONS
Value: 5
- Name: CONNECT_STATUS_STORAGE_PARTITIONS
Value: 1
- Name: CONNECT_KEY_CONVERTER
Value: org.apache.kafka.connect.json.JsonConverter
- Name: CONNECT_VALUE_CONVERTER
Value: org.apache.kafka.connect.json.JsonConverter
- Name: CONNECT_INTERNAL_KEY_CONVERTER
Value: org.apache.kafka.connect.json.JsonConverter
- Name: CONNECT_INTERNAL_VALUE_CONVERTER
Value: org.apache.kafka.connect.json.JsonConverter
- Name: CONNECT_SECURITY_PROTOCOL
Value: SSL
# Producer/Consumer configs below
# Thank you to https://github.com/confluentinc/kafka-connect-jdbc/issues/161
- Name: CONNECT_PRODUCER_BOOTSTRAP_SERVERS
Value: >-
${self:custom.brokerString}
- Name: CONNECT_PRODUCER_SECURITY_PROTOCOL
Value: SSL
- Name: CONNECT_CONSUMER_BOOTSTRAP_SERVERS
Value: >-
${self:custom.brokerString}
- Name: CONNECT_CONSUMER_SECURITY_PROTOCOL
Value: SSL
- Name: CONNECT_PRODUCER_OFFSET_FLUSH_TIMEOUT_MS
Value: 30000
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Sub "${AWS::Region}"
awslogs-group: !Sub "${KafkaConnectWorkerLogGroup}"
awslogs-stream-prefix: fargate
awslogs-datetime-format: \[%Y-%m-%d %H:%M:%S,
- Name: instantclient
Image: "ghcr.io/oracle/oraclelinux8-instantclient:21"
Memory: 2048 # This will kill the instantclient container if it goes over 2048, to preserve the connector container.
Command:
- bash
- "-c"
- |
sleep infinity
Family: ${self:service}-${sls:stage}-kafka-connect-worker
NetworkMode: awsvpc
ExecutionRoleArn: !GetAtt KafkaConnectWorkerRole.Arn
TaskRoleArn: !GetAtt KafkaConnectWorkerRole.Arn
RequiresCompatibilities:
- FARGATE
Memory: 4GB
Cpu: 2048
Tags:
- Key: ConditionalDependencyHack # Ensures order based on a conditioanl resource
Value:
Fn::If:
- isDev
- !Ref CleanupTopics
- Blank
KafkaConnectCluster:
Type: "AWS::ECS::Cluster"
Properties:
ClusterName: ${self:service}-${sls:stage}-connect
ClusterSettings:
- Name: containerInsights
Value: enabled
KafkaConnectService:
Type: "AWS::ECS::Service"
Properties:
Cluster: !Sub "${KafkaConnectCluster}"
DeploymentConfiguration:
DeploymentCircuitBreaker:
Enable: true
Rollback: false
MaximumPercent: 100
MinimumHealthyPercent: 0
EnableExecuteCommand: true
LaunchType: FARGATE
ServiceName: kafka-connect
DesiredCount: 1
TaskDefinition: !Sub "${KafkaConnectWorkerTaskDefinition}"
NetworkConfiguration:
AwsvpcConfiguration:
AssignPublicIp: DISABLED
SecurityGroups:
- !Sub "${KafkaConnectWorkerSecurityGroup}"
Subnets: >-
${self:custom.vpc.dataSubnets}
ECSFailureEventRule:
Type: AWS::Events::Rule
Properties:
Description: "Connector Task Failure Event Rule"
EventPattern:
account:
- !Sub "${AWS::AccountId}"
source:
- "aws.ecs"
- "demo.cli" # used to test events from the command line
detail-type:
- "ECS Task State Change"
detail:
lastStatus:
- "STOPPED"
stoppedReason:
- "Essential container in task exited"
- "Task failed container health checks"
clusterArn:
- !GetAtt KafkaConnectCluster.Arn
Targets:
- Arn: ${param:ecsFailureTopicArn}
Id: "ConnectorEcsTaskFailure"
InputTransformer:
InputPathsMap:
"clusterArn": "$.detail.clusterArn"
"status": "$.detail.lastStatus"
"account": "$.account"
"stoppedReason": "$.detail.stoppedReason"
InputTemplate: |
"An Connector ECS Task Failure Event has occured for appian-connectors. Account: <account> Cluster ARN: <clusterArn> Status: <status> Reason: <stoppedReason>"
JdbcTaskAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmActions:
- ${param:ecsFailureTopicArn}
ComparisonOperator: GreaterThanOrEqualToThreshold
DatapointsToAlarm: 2
EvaluationPeriods: 5
MetricName: source.jdbc.appian-dbo-1_task_failures
Namespace: ${self:service}-${sls:stage}
Period: 60
Statistic: Sum
Threshold: 1
JdbcConnectorAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmActions:
- ${param:ecsFailureTopicArn}
ComparisonOperator: GreaterThanOrEqualToThreshold
DatapointsToAlarm: 2
EvaluationPeriods: 5
MetricName: source.jdbc.appian-dbo-1_failures
Namespace: ${self:service}-${sls:stage}
Period: 60
Statistic: Sum
Threshold: 1
LambdaConfigureConnectorsSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security Group for configuring the connector.
VpcId: ${self:custom.vpc.id}
LambdaConfigureConnectorsSecurityGroupEgress:
Type: AWS::EC2::SecurityGroupEgress
Properties:
GroupId: !Ref LambdaConfigureConnectorsSecurityGroup
IpProtocol: -1
CidrIp: 0.0.0.0/0
ConnectorLogsErrorCount:
Type: AWS::Logs::MetricFilter
Properties:
LogGroupName:
Ref: "KafkaConnectWorkerLogGroup"
FilterName: ConnectorLogsErrorCount
FilterPattern: "ERROR"
MetricTransformations:
- MetricValue: "1"
DefaultValue: "0"
MetricNamespace: ${self:service}-${sls:stage}/Connector/ERRORS
MetricName: "ConnectorLogsErrorCount"
Unit: Count
ConnectorLogsErrorCountAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
DatapointsToAlarm: 1
AlarmActions:
- ${param:ecsFailureTopicArn}
AlarmName: ${self:service}-${sls:stage}-ConnectorLogsErrorCount
ComparisonOperator: GreaterThanOrEqualToThreshold
EvaluationPeriods: 2
Period: 300
Threshold: 1
MetricName: ConnectorLogsErrorCount
Namespace: ${self:service}-${sls:stage}/Connector/ERRORS
Statistic: Sum
ConnectorLogsWarnCount:
Type: AWS::Logs::MetricFilter
Properties:
LogGroupName:
Ref: "KafkaConnectWorkerLogGroup"
FilterName: ConnectorLogsWarnCount
FilterPattern: "WARN"
MetricTransformations:
- MetricValue: "1"
DefaultValue: "0"
MetricNamespace: ${self:service}-${sls:stage}/Connector/WARNS
MetricName: "ConnectorLogsWarnCount"
Unit: Count
ConnectorLogsWarnCountAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
DatapointsToAlarm: 1
AlarmActions:
- ${param:ecsFailureTopicArn}
AlarmName: ${self:service}-${sls:stage}-ConnectorLogsWarnCount
ComparisonOperator: GreaterThanOrEqualToThreshold
EvaluationPeriods: 2
Period: 300
Threshold: 3
MetricName: ConnectorLogsWarnCount
Namespace: ${self:service}-${sls:stage}/Connector/WARNS
Statistic: Sum
KafkaConnectServiceECSCpuAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: ${self:service}-${sls:stage}-KafkaConnectService-CPUUtilization
AlarmDescription: Trigger an alarm when the CPU utilization reaches 75%
Namespace: AWS/ECS
MetricName: CPUUtilization
Dimensions:
- Name: ClusterName
Value: !Ref KafkaConnectCluster
- Name: ServiceName
Value: !GetAtt KafkaConnectService.Name
Statistic: Average
Period: 60
EvaluationPeriods: 2
Threshold: 75
ComparisonOperator: GreaterThanOrEqualToThreshold
AlarmActions:
- ${param:ecsFailureTopicArn}
OKActions:
- $${param:ecsFailureTopicArn}
KafkaConnectServiceECSMemoryAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: ${self:service}-${sls:stage}-KafkaConnectService-MemoryUtilization
AlarmDescription: Trigger an alarm when the Memory utilization reaches 75%
Namespace: AWS/ECS
MetricName: MemoryUtilization
Dimensions:
- Name: ClusterName
Value: !Ref KafkaConnectCluster
- Name: ServiceName
Value: !GetAtt KafkaConnectService.Name
Statistic: Average
Period: 60
EvaluationPeriods: 2
Threshold: 75
ComparisonOperator: GreaterThanOrEqualToThreshold
AlarmActions:
- ${param:ecsFailureTopicArn}
OKActions:
- ${param:ecsFailureTopicArn}
Outputs:
KafkaConnectWorkerSecurityGroupId:
Description: |
The ID of the security group attached to the Kafka Connect cluster tasks.
This can be used by other resources to attach additional ingress rules.
Value: !Ref KafkaConnectWorkerSecurityGroup