modules/mu/providers/aws/msg_queue.rb
# Copyright:: Copyright (c) 2018 eGlobalTech, Inc., all rights reserved
#
# Licensed under the BSD-3 license (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License in the root of the project or at
#
# http://egt-labs.com/mu/LICENSE.html
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module MU
class Cloud
class AWS
# A MsgQueue as configured in {MU::Config::BasketofKittens::msg_queues}
class MsgQueue < MU::Cloud::MsgQueue
# Initialize this cloud resource object. Calling +super+ will invoke the initializer defined under {MU::Cloud}, which should set the attribtues listed in {MU::Cloud::PUBLIC_ATTRS} as well as applicable dependency shortcuts, like +@vpc+, for us.
# @param args [Hash]: Hash of named arguments passed via Ruby's double-splat
def initialize(**args)
super
@mu_name ||= @deploy.getResourceName(@config["name"])
end
# Called automatically by {MU::Deploy#createResources}
def create
attrs = genQueueAttrs
namestr = @mu_name
namestr += ".fifo" if attrs['FifoQueue']
MU.log "Creating SQS queue #{namestr}", details: attrs
resp = MU::Cloud::AWS.sqs(region: @region, credentials: @credentials).create_queue(
queue_name: namestr,
attributes: attrs
)
sleep 1
MU.log "SQS queue #{@config['name']} is at: #{resp.queue_url}", MU::SUMMARY
@cloud_id = resp.queue_url
end
# Called automatically by {MU::Deploy#createResources}
def groom
tagQueue
cur_attrs = notify
# if cur_attrs["Policy"]
# MU.log "FECK", MU::WARN, details: JSON.parse(cur_attrs["Policy"]).to_yaml
# end
new_attrs = genQueueAttrs
changed = false
new_attrs.each_pair { |k, _v|
if !cur_attrs.has_key?(k) or cur_attrs[k] != new_attrs[k]
changed = true
end
}
if changed
MU.log "Updating SQS queue #{@mu_name}", MU::NOTICE, details: new_attrs
MU::Cloud::AWS.sqs(region: @region, credentials: @credentials).set_queue_attributes(
queue_url: @cloud_id,
attributes: new_attrs
)
end
end
# Canonical Amazon Resource Number for this resource
# @return [String]
def arn
"arn:"+(MU::Cloud::AWS.isGovCloud?(@region) ? "aws-us-gov" : "aws")+":sqs:"+@region+":"+MU::Cloud::AWS.credToAcct(@credentials)+":"+@cloud_id
end
@cloud_desc_cache = nil
# Retrieve the AWS descriptor for this SQS queue. AWS doesn't exactly
# provide one; if you want real information for SQS ask notify()
# @return [Hash]: AWS doesn't return anything but the SQS URL, so supplement with attributes
def cloud_desc(use_cache: true)
return @cloud_desc_cache if @cloud_desc_cache and use_cache
return nil if !@cloud_id
if !@cloud_id
resp = MU::Cloud::AWS.sqs(region: @region, credentials: @credentials).list_queues(
queue_name_prefix: @mu_name
)
return nil if !resp or !resp.queue_urls
resp.queue_urls.each { |url|
if url.match(/\/#{Regexp.quote(@mu_name)}$/)
@cloud_id ||= url
break
end
}
end
return nil if !@cloud_id
@cloud_desc_cache = MU::Cloud::AWS::MsgQueue.find(
cloud_id: @cloud_id.dup,
region: @region,
credentials: @credentials
)
@cloud_desc_cache
end
# Return the metadata for this MsgQueue rule
# @return [Hash]
def notify
cloud_desc
deploy_struct = MU::Cloud::AWS::MsgQueue.find(
cloud_id: @cloud_id,
region: @region,
credentials: @credentials
)
return deploy_struct
end
# Does this resource type exist as a global (cloud-wide) artifact, or
# is it localized to a region/zone?
# @return [Boolean]
def self.isGlobal?
false
end
# Denote whether this resource implementation is experiment, ready for
# testing, or ready for production use.
def self.quality
MU::Cloud::RELEASE
end
# Remove all msg_queues associated with the currently loaded deployment.
# @param noop [Boolean]: If true, will only print what would be done
# @param ignoremaster [Boolean]: If true, will remove resources not flagged as originating from this Mu server
# @param region [String]: The cloud provider region
# @return [void]
def self.cleanup(noop: false, deploy_id: MU.deploy_id, ignoremaster: false, region: MU.curRegion, credentials: nil, flags: {})
MU.log "AWS::MsgQueue.cleanup: need to support flags['known']", MU::DEBUG, details: flags
MU.log "Placeholder: AWS MsgQueue artifacts do not support tags, so ignoremaster cleanup flag has no effect", MU::DEBUG, details: ignoremaster
resp = MU::Cloud::AWS.sqs(credentials: credentials, region: region).list_queues(
queue_name_prefix: deploy_id
)
if resp and resp.queue_urls
threads = []
resp.queue_urls.each { |url|
threads << Thread.new {
MU.log "Deleting SQS queue #{url}"
if !noop
MU::Cloud::AWS.sqs(credentials: credentials, region: region).delete_queue(
queue_url: url
)
sleep 60 # per API docs, this is how long it takes to really delete
end
}
}
threads.each { |t|
t.join
}
end
end
# Locate an existing msg_queue.
# @return [Hash]: AWS doesn't return anything but the SQS URL, so supplement with attributes
def self.find(**args)
args[:flags] ||= {}
args[:flags]['account'] ||= MU.account_number
found = {}
# If it's a URL, make sure it's good
begin
if args[:cloud_id]
if args[:cloud_id].match(/^https?:/i)
resp = MU::Cloud::AWS.sqs(region: args[:region], credentials: args[:credentials]).get_queue_attributes(
queue_url: args[:cloud_id],
attribute_names: ["All"]
)
if resp and resp.attributes
desc = resp.attributes.dup
desc["Url"] = args[:cloud_id]
found[args[:cloud_id]] = desc
return found
end
else
# If it's a plain queue name, resolve it to a URL
resp = MU::Cloud::AWS.sqs(region: args[:region], credentials: args[:credentials]).get_queue_url(
queue_name: args[:cloud_id],
queue_owner_aws_account_id: args[:flags]['account']
)
args[:cloud_id] = resp.queue_url if resp and resp.queue_url
end
end
rescue ::Aws::SQS::Errors::NonExistentQueue
end
# Go fetch its attributes
fetch = if args[:cloud_id]
if args[:cloud_id] !~ /^https?:\/\//
[begin
MU::Cloud::AWS.sqs(region: args[:region], credentials: args[:credentials]).get_queue_url(queue_name: args[:cloud_id]).queue_url
rescue Aws::SQS::Errors::NonExistentQueue
return found
end]
else
[args[:cloud_id]]
end
else
resp = MU::Cloud::AWS.sqs(region: args[:region], credentials: args[:credentials]).list_queues
resp.queue_urls
end
if fetch
fetch.each { |url|
resp = MU::Cloud::AWS.sqs(region: args[:region], credentials: args[:credentials]).get_queue_attributes(
queue_url: url,
attribute_names: ["All"]
)
if resp and resp.attributes
desc = resp.attributes.dup
desc["Url"] = url
found[url] = desc
end
}
end
found
end
# Cloud-specific configuration properties.
# @param _config [MU::Config]: The calling MU::Config object
# @return [Array<Array,Hash>]: List of required fields, and json-schema Hash of cloud-specific configuration parameters for this resource
def self.schema(_config)
toplevel_required = []
schema = {
"max_msg_size" => {
"type" => "integer",
"description" => "Maximum size of messages in this queue, in kB. Must be between 1 and 256.",
"default" => 256
},
"retain" => {
"type" => "string",
"description" => "The length of time for which Amazon SQS retains a message. Assumed to be in seconds, unless you specify a string like '4d' or 'five hours'. Must be between 1 minute and 14 days.",
"default" => "4 days"
},
"delay" => {
"type" => "string",
"description" => "Delay delivery by up to 15 minutes. You can specify a string like '1m' or '600 seconds'.",
"default" => "0 seconds"
},
"receive_timeout" => {
"type" => "string",
"description" => "The length of time, for which a ReceiveMessage action waits for a message to arrive, between 0 and 20 seconds. You can specify a string like '5s' or '20 seconds'.",
"default" => "0 seconds"
},
"visibility_timeout" => {
"type" => "string",
"description" => "The length of time during which Amazon SQS prevents other consumers from receiving and processing a message after another consumer has received it. Must be between 0 seconds and 12 hours. You can specify a string like '5 minutes' or '3 hours'. See also: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html",
"default" => "30 seconds"
},
"fifo" => {
"type" => "boolean",
"description" => "Designate this queue as a FIFO queue. Messages in this queue must explicitly specify MessageGroupId. This cannot be changed once instantiated. This feature is not available in all regions. See also: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html#FIFO-queues-understanding-logic",
"default" => false
},
"dedup" => {
"type" => "boolean",
"description" => "Enables content-based deduplication. When ContentBasedDeduplication is in effect, messages with identical content sent within the deduplication interval are treated as duplicates and only one copy of the message is delivered. This feature is not available in all regions. See also: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html#FIFO-queues-exactly-once-processing",
"default" => false
},
"failqueue" => {
"type" => "object",
"description" => "Target queue for messages that can't be processed (consumed) successfully. See also: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html",
"properties" => {
"create" => {
"type" => "boolean",
"description" => "Create a separate MsgQueue on the fly."
},
"retries_before_fail" => {
"type" => "integer",
"description" => "Number of times a message should fail before being sent to this queue. Must be between 1 and 1000.",
"default" => 10
},
"name" => {
"type" => "string",
"description" => "The name of a sibling SQS resource in this deploy, or the cloud identifier or URL of a pre-existing one"
}
}
},
# TODO this doesn't work as either an ARN, short identifier, or full JSON policy descriptor. Docs are vague. Need to ask AWS.
# "iam_policy" => {
# "type" => "string",
# "description" => "An IAM policy document for access to this SQS queue. Our parser expects this to be defined inline like the rest of your YAML/JSON Basket of Kittens, not as raw JSON. For guidance on SQS IAM capabilities, see: https://docs.aws.amazon.com/IAM/latest/UserGuide/list_amazonsqs.html"
# },
"kms" => {
"type" => "object",
"description" => "Use an Amazon KMS key to encrypt and decrypt messages in the background. This feature is not available in all regions. https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-server-side-encryption.html#sqs-sse-key-terms",
"required" => ["key_id", "key_reuse_period"],
"properties" => {
"key_id" => {
"type" => "string",
"description" => "KMS key to use for encryption and decryption"
},
"key_reuse_period" => {
"type" => "string",
"description" => "The length of time, in seconds, for which Amazon SQS can reuse a data key to encrypt or decrypt messages before calling AWS KMS again. You can specify a string like '5m' or '2 hours'.",
"default" => "5 minutes"
}
}
}
}
[toplevel_required, schema]
end
# Cloud-specific pre-processing of {MU::Config::BasketofKittens::msg_queues}, bare and unvalidated.
# @param queue [Hash]: The resource to process and validate
# @param configurator [MU::Config]: The overall deployment configurator of which this resource is a member
# @return [Boolean]: True if validation succeeded, False otherwise
def self.validateConfig(queue, configurator)
ok = true
if queue['failqueue']
if (!queue['failqueue']['create'] and !queue['failqueue'].has_key?("name")) or
(queue['failqueue']['create'] and queue['failqueue']['name'])
MU.log "Must set exactly one of 'create' or 'failqueue' in MsgQueue #{queue['name']}.", MU::ERR
ok = false
end
if queue['failqueue']['retries_before_fail'] < 1 or
queue['failqueue']['retries_before_fail'] > 1000
MU.log "'retries_before_fail' must be between 1 and 1000 in MsgQueue #{queue['name']}.", MU::ERR
ok = false
end
if queue['failqueue']['create']
failq = queue.dup
failq['name'] += "-fail"
failq.delete("failqueue")
ok = false if !configurator.insertKitten(failq, "msg_queues")
queue['failqueue']['name'] = failq['name']
MU::Config.addDependency(queue, failq["name"], "msg_queue")
else
if configurator.haveLitterMate?(queue['failqueue']['name'], "msg_queue")
MU::Config.addDependency(queue, queue['failqueue']['name'], "msg_queue")
else
failq = MU::Cloud::AWS::MsgQueue.find(cloud_id: queue['failqueue']['name'])
if !failq
MU.log "Could not find an SQS queue named #{queue['failqueue']['name']} for failqueue in MsgQueue '#{queue['name']}'", MU::ERR
ok = false
end
end
end
end
if queue['max_msg_size'] < 1 or queue['max_msg_size'] > 256
MU.log "Must specify a 'max_msg_size' value between 1 and 256 in MsgQueue #{queue['name']}.", MU::ERR
ok = false
end
queue['max_msg_size'] *= 1024 # the API takes it in bytes
queue['retain'] = ChronicDuration.parse(queue['retain'], :keep_zero => true)
if !queue['retain'] or queue['retain'] < 60 or queue['retain'] > 1209600
MU.log "Must specify a 'retain' value between 1 minute and 14 days in MsgQueue #{queue['name']}.", MU::ERR
ok = false
end
queue['delay'] = ChronicDuration.parse(queue['delay'], :keep_zero => true)
if !queue['delay'] or queue['delay'] < 0 or queue['delay'] > 900
MU.log "'delay' value must be between 0 seconds and 15 minutes in MsgQueue #{queue['name']}.", MU::ERR
ok = false
end
queue['receive_timeout'] = ChronicDuration.parse(queue['receive_timeout'], :keep_zero => true)
if !queue['receive_timeout'] or queue['receive_timeout'] < 0 or queue['receive_timeout'] > 20
MU.log "'receive_timeout' value must be between 0 seconds and 20 seconds in MsgQueue #{queue['name']}.", MU::ERR
ok = false
end
queue['visibility_timeout'] = ChronicDuration.parse(queue['visibility_timeout'], :keep_zero => true)
if !queue['visibility_timeout'] or queue['visibility_timeout'] < 0 or queue['visibility_timeout'] > 43200
MU.log "'visibility_timeout' value must be between 0 seconds and 12 hours in MsgQueue #{queue['name']}.", MU::ERR
ok = false
end
if queue['kms']
good_regions = ["us-east-1", "us-east-2", "us-west-2"]
if !good_regions.include?(queue['region'])
MU.log "KMS SQS encryption isn't supported in all regions, and #{queue['region']} wasn't on the list last we checked. Queue '#{queue['name']}' may not work.", MU::WARN, details: good_regions
end
queue['kms']['key_reuse_period'] = ChronicDuration.parse(queue['kms']['key_reuse_period'], :keep_zero => true)
if !queue['kms']['key_reuse_period'] or queue['kms']['key_reuse_period'] < 60 or queue['kms']['key_reuse_period'] > 86400
MU.log "KMS 'visibility_period' value must be between 60 seconds and 24 hours in MsgQueue #{queue['name']}.", MU::ERR
ok = false
end
begin
MU::Cloud::AWS.kms(region: queue['region']).describe_key(key_id: queue['kms']['key_id'])
rescue Aws::KMS::Errors::NotFoundException
MU.log "KMS key '#{queue['kms']['key_id']}' specified in Queue '#{queue['name']}' was not found.", MU::ERR, details: "Key IDs are of the form bf64a093-2c3d-46fa-0d4f-8232fa7ed53. Keys can be created at https://console.aws.amazon.com/iam/home#/encryptionKeys/#{queue['region']}"
ok = false
end
end
good_regions = ["us-east-1", "us-east-2", "us-west-2", "eu-west-1"]
if (queue['fifo'] or queue['dedup']) and !good_regions.include?(queue['region'])
MU.log "Fifo queues aren't supported in all regions, and #{queue['region']} wasn't on the list last we checked. MsgQueue '#{queue['name']}' may not work.", MU::WARN, details: good_regions
end
# TODO have IAM API validate queue['iam_policy'] if any is set
ok
end
private
def genQueueAttrs
attrs = {
"MaximumMessageSize" => @config['max_msg_size'].to_s,
"MessageRetentionPeriod" => @config['retain'].to_s,
"DelaySeconds" => @config['delay'].to_s,
"ReceiveMessageWaitTimeSeconds" => @config['receive_timeout'].to_s
}
if @config['failqueue']
sibling = @deploy.findLitterMate(type: "msg_queue", name: @config['failqueue']['name'])
id = @config['failqueue']['name']
if sibling # resolve sibling queues to something useful
id = sibling.cloud_id
end
desc = MU::Cloud::AWS::MsgQueue.find(cloud_id: id, credentials: @credentials)
if !desc
raise MuError, "Failed to get cloud descriptor for SQS queue #{@config['failqueue']['name']}"
end
rdr_pol = {
"deadLetterTargetArn" => desc["QueueArn"],
"maxReceiveCount" => @config['failqueue']['retries_before_fail']
}
attrs["RedrivePolicy"] = JSON.generate(rdr_pol)
end
# These aren't supported in most regions, and will fail loudly and
# spectacularly if you try to use them in the forbidden lands.
if @config['fifo'] or @config['dedup']
attrs["FifoQueue"] = "true" # dedup enables fifo implicitly
attrs["ContentBasedDeduplication"] = @config['dedup'].to_s
end
if @config['kms']
attrs["KmsMasterKeyId"] = @config['kms']['key_id'].to_s
attrs["KmsDataKeyReusePeriodSeconds"] = @config['kms']['key_reuse_period'].to_s
end
# TODO this doesn't work as either an ARN, short identifier, or full JSON policy descriptor. Docs are vague. Need to ask AWS.
# if @config['iam_policy']
# attrs["Policy"] = JSON.generate(@config['iam_policy'])
# end
attrs
end
def tagQueue(url = nil)
tags = {}
tags["Name"] = @mu_name
MU::MommaCat.listStandardTags.each_pair { |name, value|
tags[name] = value
}
if @config['optional_tags']
MU::MommaCat.listOptionalTags.each_pair { |name, value|
tags[name] = value
}
end
if @config['tags']
@config['tags'].each { |tag|
tags[tag['key']] = tag['value']
}
end
if !url
desc = cloud_desc
url = desc["Url"]
if !url
raise MU::MuError, "Can't tag SQS queue, failed to retrieve queue_url"
end
end
begin
MU::Cloud::AWS.sqs(region: @region, credentials: @credentials).tag_queue(
queue_url: url,
tags: tags
)
rescue ::Aws::SQS::Errors::UnsupportedOperation, NameError => e
MU.log "We appear to be in a region that does not support SQS tagging. Skipping tags for #{@mu_name}", MU::NOTICE, details: e.message
end
end
end
end
end
end