cloudamatic/mu

View on GitHub
modules/mu/providers/aws/cache_cluster.rb

Summary

Maintainability
D
2 days
Test Coverage
# Copyright:: Copyright (c) 2014 eGlobalTech, Inc., all rights reserved
#
# Licensed under the BSD-3 license (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License in the root of the project or at
#
#    http://egt-labs.com/mu/LICENSE.html
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module MU
  class Cloud
    class AWS
      # A cache cluster as configured in {MU::Config::BasketofKittens::cache_clusters}
      class CacheCluster < MU::Cloud::CacheCluster

        # Initialize this cloud resource object. Calling +super+ will invoke the initializer defined under {MU::Cloud}, which should set the attribtues listed in {MU::Cloud::PUBLIC_ATTRS} as well as applicable dependency shortcuts, like +@vpc+, for us.
        # @param args [Hash]: Hash of named arguments passed via Ruby's double-splat
        def initialize(**args)
          super
          @mu_name ||=
            if @config["create_replication_group"]
              @deploy.getResourceName(@config["name"], max_length: 16, need_unique_string: true)
            else
              @deploy.getResourceName(@config["name"], max_length: 20, need_unique_string: true)
            end

          @mu_name.gsub!(/(--|-$)/i, "")
        end

        # Canonical Amazon Resource Number for this resource
        # @return [String]
        def arn
          "arn:"+(MU::Cloud::AWS.isGovCloud?(@region) ? "aws-us-gov" : "aws")+":elasticache:"+@region+":"+MU::Cloud::AWS.credToAcct(@credentials)+":cluster/"+@cloud_id
        end

        # Locate an existing Cache Cluster or Cache Clusters and return an array containing matching AWS resource descriptors for those that match.
        # @return [Hash<String,OpenStruct>]: The cloud provider's complete descriptions of matching Cache Clusters.
        def self.find(**args)
          found = {}

          if args[:cloud_id]
            cache_cluster = MU::Cloud::AWS::CacheCluster.getCacheClusterById(args[:cloud_id], region: args[:region], credentials: args[:credentials])
            found[args[:cloud_id]] = cache_cluster if cache_cluster
          end

          MU::Cloud::AWS.elasticache(region: args[:region], credentials: args[:credentials]).describe_cache_clusters.cache_clusters.each { |cc|
            if args[:tag_value]
              resp = MU::Cloud::AWS.elasticache(region: args[:region], credentials: args[:credentials]).list_tags_for_resource(
                resource_name: MU::Cloud::AWS::CacheCluster.getARN(cc.cache_cluster_id, "cluster", "elasticache", region: args[:region], credentials: args[:credentials])
              )
              if resp && resp.tag_list && !resp.tag_list.empty?
                resp.tag_list.each { |tag|
                  found[cc.cache_cluster_id] = cc if tag.key == args[:tag_key] and tag.value == args[:tag_value]
                }
              end
            else
              found[cc.cache_cluster_id] = cc
            end
          }

          return found
        end

        # Construct an Amazon Resource Name for an AWS resource.
        # Some APIs require this identifier in order to do things that other APIs can do with shorthand.
        # @param resource [String]: The name of the resource
        # @param client_type [String]: The name of the client (eg. elasticache, rds, ec2, s3)
        # @param resource_type [String]: The type of the resource
        # @param region [String]: The region in which the resource resides.
        # @param credentials [String]: The account in which the resource resides.
        # @return [String]
        def self.getARN(resource, resource_type, client_type, region: MU.curRegion, credentials: nil)
          aws_str = MU::Cloud::AWS.isGovCloud?(region) ? "aws-us-gov" : "aws"
          "arn:#{aws_str}:#{client_type}:#{region}:#{MU::Cloud::AWS.credToAcct(credentials)}:#{resource_type}:#{resource}"
        end

        # Construct all our tags.
        # @return [Array]: All our standard tags and any custom tags.
        def allTags
          tags = []
          MU::MommaCat.listStandardTags.each_pair { |name, value|
            tags << {key: name, value: value}
          }

          if @config['optional_tags']
            MU::MommaCat.listOptionalTags.each_pair { |name, value|
              tags << {key: name, value: value}
            }
          end

          if @config['tags']
            @config['tags'].each { |tag|
              tags << {key: tag['key'], value: tag['value']}
            }
          end

          return tags
        end

        # Add our standard tag set to an Amazon ElasticCache resource.
        # @param resource [String]: The name of the resource
        # @param resource_type [String]: The type of the resource
        # @param region [String]: The cloud provider region
        def addStandardTags(resource, resource_type, region: MU.curRegion)
          MU.log "Adding tags to ElasticCache resource #{resource}"
          MU::Cloud::AWS.elasticache(region: region).add_tags_to_resource(
            resource_name: MU::Cloud::AWS::CacheCluster.getARN(resource, resource_type, "elasticache", region: @region, credentials: @credentials),
            tags: allTags
          )
        end

        # Called automatically by {MU::Deploy#createResources}
        # @return [String]: The cloud provider's identifier for this cache cluster instance.
        def create
          @config["snapshot_id"] =
            if @config["creation_style"] == "existing_snapshot"
              getExistingSnapshot ? getExistingSnapshot : createNewSnapshot
            elsif @config["creation_style"] == "new_snapshot"
              createNewSnapshot
            end

            # Should we base all our resource names on @mu_name even though only the cache cluster / cache replication group identifiers are the isssue? 
          @config['identifier'] = @mu_name
          @config["subnet_group_name"] = @mu_name
          createSubnetGroup

          # Shared configuration elements between cache clusters and cache replication groups
          config_struct = {
            cache_node_type: @config["size"],
            engine: @config["engine"],
            engine_version: @config["engine_version"],
            cache_subnet_group_name: @config["subnet_group_name"],
            preferred_maintenance_window: @config["preferred_maintenance_window"],
            port: @config["port"],
            auto_minor_version_upgrade: @config["auto_minor_version_upgrade"],
            security_group_ids: @config["security_group_ids"],
            tags: allTags
          }

          if @config["engine"] == "redis"
            config_struct[:snapshot_name] = @config["snapshot_id"] if @config["snapshot_id"]
            config_struct[:snapshot_arns] = @config["snapshot_arn"] if @config["snapshot_arn"]
            config_struct[:snapshot_retention_limit] = @config["snapshot_retention_limit"] if @config["snapshot_retention_limit"]
            config_struct[:snapshot_window] = @config["snapshot_window"] if @config["snapshot_window"]
          end

          if @config.has_key?("parameter_group_family")
            # downcasing parameter_group_name because  AWS downcases the name for us and we won't be able to find it.
            # AWS downcases all resource names in ElastiCache for some reason, 
            # but other resources don't seem to be case sensitive when we try to retrieve them.
            @config["parameter_group_name"] = @mu_name.downcase
            createParameterGroup
            config_struct[:cache_parameter_group_name] = @config["parameter_group_name"]
          end

          config_struct[:notification_topic_arn] = @config["notification_topic_arn"] if @config["notification_topic_arn"]

          # Mu already cleans up our resources for us when we fail mid creation, so not doing begin/ensure to cleanup in case we fail.
          if @config["create_replication_group"]
            config_struct[:automatic_failover_enabled] = @config['automatic_failover']
            config_struct[:replication_group_id] = @config['identifier']
            config_struct[:replication_group_description] = @mu_name
            config_struct[:num_cache_clusters] = @config["node_count"]
            # config_struct[:primary_cluster_id] = @config["primary_cluster_id"]
            # config_struct[:preferred_cache_cluster_a_zs] = @config["preferred_cache_cluster_azs"]

            MU.log "Creating cache replication group #{@config['identifier']}"
            MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).create_replication_group(config_struct).replication_group

            wait_start_time = Time.now
            retries = 0
            begin
              MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).wait_until(:replication_group_available, replication_group_id: @config['identifier']) do |waiter|
                waiter.max_attempts = nil
                waiter.before_attempt do |attempts|
                  MU.log "Waiting for cache replication group #{@config['identifier']} to become available", MU::NOTICE if attempts % 5 == 0
                end
                waiter.before_wait do |_attempts, r|
                  throw :success if r.replication_groups.first.status == "available"
                  throw :failure if Time.now - wait_start_time > 1800
                end
              end
            rescue Aws::Waiters::Errors::TooManyAttemptsError => e
              raise MuError, "Waited #{(Time.now - wait_start_time).round/60*(retries+1)} minutes for #{@config['identifier']} become available, giving up. #{e}" if retries > 2
              wait_start_time = Time.now
              retries += 1
              retry
            end

            resp = MU::Cloud::AWS::CacheCluster.getCacheReplicationGroupById(@config['identifier'], region: @region)

            # We want to make sure the clusters in the cache replication group get our tags
            resp.member_clusters.each { |member|
              addStandardTags(member, "cluster", region: @region)
            }

            MU::Cloud.resourceClass("AWS", "DNSZone").genericMuDNSEntry(
              name: resp.replication_group_id,
              target: "#{resp.node_groups.first.primary_endpoint.address}.",
              cloudclass: MU::Cloud::CacheCluster,
              sync_wait: @config['dns_sync_wait']
            )

            resp.node_groups.first.node_group_members.each { |member|
              MU::Cloud.resourceClass("AWS", "DNSZone").genericMuDNSEntry(
                name: member.cache_cluster_id,
                target: "#{member.read_endpoint.address}.",
                cloudclass: MU::Cloud::CacheCluster,
                sync_wait: @config['dns_sync_wait']
              )

            }

            MU.log "Cache replication group #{@config['identifier']} is ready to use"
            @cloud_id = resp.replication_group_id
          else
            config_struct[:cache_cluster_id] = @config['identifier']
            config_struct[:az_mode] = @config["multi_az"] ? "cross-az" : "single-az"
            config_struct[:num_cache_nodes] = @config["node_count"]
            # config_struct[:replication_group_id] = @config["replication_group_id"] if @config["replication_group_id"]
            # config_struct[:preferred_availability_zone] = @config["preferred_availability_zone"] if @config["preferred_availability_zone"] && @config["az_mode"] == "single-az"
            # config_struct[:preferred_availability_zones] = @config["preferred_availability_zones"] if @config["preferred_availability_zones"] && @config["az_mode"] == "cross-az"

            MU.log "Creating cache cluster #{@config['identifier']}"
            begin
              MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).create_cache_cluster(config_struct).cache_cluster
            rescue ::Aws::ElastiCache::Errors::InvalidParameterValue => e
              if e.message.match(/security group (sg-[^\s]+)/)
                bad_sg = Regexp.last_match[1]
                MU.log "Removing invalid security group #{bad_sg} from Cache Cluster #{@mu_name}", MU::WARN, details: e.message
                config_struct[:security_group_ids].delete(bad_sg)
                retry
              else
                raise e
              end
            end

            wait_start_time = Time.now
            retries = 0
            begin
              MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).wait_until(:cache_cluster_available, cache_cluster_id: @config['identifier']) do |waiter|
                waiter.max_attempts = nil
                waiter.before_attempt do |attempts|
                  MU.log "Waiting for cache cluster #{@config['identifier']} to become available", MU::NOTICE if attempts % 5 == 0
                end
                waiter.before_wait do |_attempts, r|
                  throw :success if r.cache_clusters.first.cache_cluster_status  == "available"
                  throw :failure if Time.now - wait_start_time > 1800
                end
              end
            rescue Aws::Waiters::Errors::TooManyAttemptsError => e
              raise MuError, "Waited #{(Time.now - wait_start_time).round/60*(retries+1)} minutes for #{@config['identifier']} to become available, giving up. #{e}" if retries > 2
              wait_start_time = Time.now
              retries += 1
              retry
            end

            resp = MU::Cloud::AWS::CacheCluster.getCacheClusterById(@config['identifier'], region: @region, credentials: @credentials)
            MU.log "Cache Cluster #{@config['identifier']} is ready to use"
            @cloud_id = resp.cache_cluster_id
          end
        end

        # Create a subnet group for a Cache Cluster with the given config.
        def createSubnetGroup
          subnet_ids = []
          if @config["vpc"] && !@config["vpc"].empty?
            raise MuError.new "Didn't find the VPC specified for #{@mu_name}", details: @config["vpc"].to_h unless @vpc

            vpc_id = @vpc.cloud_id

            # Getting subnet IDs
            if @config["vpc"]["subnets"].empty?
              @vpc.subnets.each { |subnet|
                subnet_ids << subnet.cloud_id
              }
              MU.log "No subnets specified for #{@config['identifier']}, adding all subnets in #{@vpc}", MU::DEBUG
            else
              @config["vpc"]["subnets"].each { |subnet|
                subnet_obj = @vpc.getSubnet(cloud_id: subnet["subnet_id"].to_s, name: subnet["subnet_name"].to_s)
                raise MuError.new "Couldn't find a live subnet matching #{subnet} in #{@vpc}", details: @vpc.subnets if subnet_obj.nil?
                subnet_ids << subnet_obj.cloud_id
              }
            end
          else
            # If we didn't specify a VPC try to figure out if the account has a default VPC
            vpc_id = nil
            subnets = []
            MU::Cloud::AWS.ec2(region: @region, credentials: @credentials).describe_vpcs.vpcs.each { |vpc|
              if vpc.is_default
                vpc_id = vpc.vpc_id
                subnets = MU::Cloud::AWS.ec2(region: @region, credentials: @credentials).describe_subnets(
                  filters: [
                    {
                      name: "vpc-id", 
                      values: [vpc_id]
                    }
                  ]
                ).subnets
                break
              end
            }

            if !subnets.empty?
              mu_subnets = []
              subnets.each { |subnet|
                subnet_ids << subnet.subnet_id
                mu_subnets << {"subnet_id" => subnet.subnet_id}
              }

              @config['vpc'] = {
                "vpc_id" => vpc_id,
                "subnets" => mu_subnets
              }

              MU.log "Using default VPC for cache cluster #{@config['identifier']}"
            end
          end

          if subnet_ids.empty?
            raise MuError, "Can't create subnet group #{@config["subnet_group_name"]} because I couldn't find a VPC or subnets"
          else
            MU.log "Creating subnet group #{@config["subnet_group_name"]} for cache cluster #{@config['identifier']}"

            MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).create_cache_subnet_group(
              cache_subnet_group_name: @config["subnet_group_name"],
              cache_subnet_group_description: @config["subnet_group_name"],
              subnet_ids: subnet_ids
            )

            allowBastionAccess

            if @dependencies.has_key?('firewall_rule')
              @config["security_group_ids"] = []
              @dependencies['firewall_rule'].values.each { |sg|
                @config["security_group_ids"] << sg.cloud_id
              }
            end
          end
        end

        # Create a Cache Cluster parameter group.
        def createParameterGroup        
          MU.log "Creating a cache cluster parameter group #{@config["parameter_group_name"]}"
          MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).create_cache_parameter_group(
            cache_parameter_group_name: @config["parameter_group_name"],
            cache_parameter_group_family: @config["parameter_group_family"],
            description: "Parameter group for #{@config["parameter_group_family"]}"
          )

          if @config.has_key?("parameter_group_parameters") && !@config["parameter_group_parameters"].empty?
            params = []
            @config["parameter_group_parameters"].each { |item|
              params << {parameter_name: item['name'], parameter_value: item['value']}
            }

            MU.log "Modifiying cache cluster parameter group #{@config["parameter_group_name"]}"
            MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).modify_cache_parameter_group(
              cache_parameter_group_name: @config["parameter_group_name"],
              parameter_name_values: params
            )
          end
        end

        # Retrieve a Cache Cluster parameter group name of on existing parameter group.
        # @return [String]: Cache Cluster parameter group name.
        def getParameterGroup
          MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).describe_cache_parameter_groups(
            cache_parameter_group_name: @config["parameter_group_name"]
          ).cache_parameter_groups.first.cache_parameter_group_name
        end

        # Called automatically by {MU::Deploy#createResources}
        def groom
          # Do we have anything to do here??
        end

        # Retrieve the complete cloud provider description of a cache cluster.
        # @param cc_id [String]: The cloud provider's identifier for this cache cluster.
        # @param region [String]: The cloud provider's region.
        # @return [OpenStruct]
        def self.getCacheClusterById(cc_id, region: MU.curRegion, credentials: nil)
          begin
            MU::Cloud::AWS.elasticache(region: region, credentials: credentials).describe_cache_clusters(cache_cluster_id: cc_id).cache_clusters.first
          rescue Aws::ElastiCache::Errors::CacheClusterNotFound
            nil
          end
        end
        
        # Retrieve the complete cloud provider description of a cache replication group.
        # @param repl_group_id [String]: The cloud provider's identifier for this cache replication group.
        # @param region [String]: The cloud provider's region.
        # @return [OpenStruct]
        def self.getCacheReplicationGroupById(repl_group_id, region: MU.curRegion, credentials: nil)
          MU::Cloud::AWS.elasticache(region: region, credentials: credentials).describe_replication_groups(replication_group_id: repl_group_id).replication_groups.first
        end

        # Register a description of this cache cluster / cache replication group with this deployment's metadata. 
        def notify
          ### TO DO: Flatten the replication group deployment metadata structure. It is probably waaaaaaay too nested.
          if @config["create_replication_group"]
            repl_group = MU::Cloud::AWS::CacheCluster.getCacheReplicationGroupById(@config['identifier'], region: @region, credentials: @credentials)
            # DNS records for the "real" zone should always be registered as late as possible so override_existing only overwrites the records after the resource is ready to use.
            if @config['dns_records']
              @config['dns_records'].each { |dnsrec|
                dnsrec['name'] = repl_group.node_groups.first.primary_endpoint.address.downcase if !dnsrec.has_key?('name')
                dnsrec['name'] = "#{dnsrec['name']}.#{MU.environment.downcase}" if dnsrec["append_environment_name"] && !dnsrec['name'].match(/\.#{MU.environment.downcase}$/)
              }
            end
            # XXX this should be a call to @deploy.nameKitten
            MU::Cloud.resourceClass("AWS", "DNSZone").createRecordsFromConfig(@config['dns_records'], target: repl_group.node_groups.first.primary_endpoint.address)

            deploy_struct = {
              "identifier" => repl_group.replication_group_id,
              "create_style" => @config["create_style"],
              "region" => @region,
              "members" => repl_group.member_clusters,
              "automatic_failover" => repl_group.automatic_failover,
              "snapshotting_cluster_id" => repl_group.snapshotting_cluster_id,
              "primary_endpoint" => repl_group.node_groups.first.primary_endpoint.address,
              "primary_port" => repl_group.node_groups.first.primary_endpoint.port
            }

            repl_group.member_clusters.each { |id|
              cluster = MU::Cloud::AWS::CacheCluster.getCacheClusterById(id, region: @region)

              vpc_sg_ids = []
              cluster.security_groups.each { |vpc_sg|
                vpc_sg_ids << vpc_sg.security_group_id
              }

              cache_sg_ids = []
              unless cluster.cache_security_groups.empty?
                cluster.cache_security_groups.each { |cache_sg|
                  cache_sg_ids << cache_sg.security_group_id
                }
              end

              deploy_struct[id] = {
                "configuration_endpoint" => cluster.configuration_endpoint,
                "cache_node_type" => cluster.cache_node_type,
                "engine" => cluster.engine,
                "engine_version" => cluster.engine_version,
                "num_cache_nodes" => cluster.num_cache_nodes,
                "preferred_maintenance_window" => cluster.preferred_maintenance_window,
                "notification_configuration" => cluster.notification_configuration,
                "cache_security_groups" => cache_sg_ids,
                "cache_parameter_group" => cluster.cache_parameter_group.cache_parameter_group_name,
                "cache_subnet_group_name" => cluster.cache_subnet_group_name,
                "cache_nodes" => cluster.cache_nodes,
                "auto_minor_version_upgrade" => cluster.auto_minor_version_upgrade,
                "vpc_security_groups" => vpc_sg_ids,
                "replication_group_id" => cluster.replication_group_id,
                "snapshot_retention_limit" => cluster.snapshot_retention_limit,
                "snapshot_window" => cluster.snapshot_window              
              }
            }

            repl_group.node_groups.first.node_group_members.each{ |member| 
              deploy_struct[member.cache_cluster_id]["cache_node_id"] = member.cache_node_id
              deploy_struct[member.cache_cluster_id]["read_endpoint_address"] = member.read_endpoint.address
              deploy_struct[member.cache_cluster_id]["read_endpoint_port"] = member.read_endpoint.port
              deploy_struct[member.cache_cluster_id]["current_role"] = member.current_role
            }
          else
            cluster = MU::Cloud::AWS::CacheCluster.getCacheClusterById(@config['identifier'], region: @region, credentials: @credentials)

            vpc_sg_ids = []
            cluster.security_groups.each { |vpc_sg|
              vpc_sg_ids << vpc_sg.security_group_id
            }

            cache_sg_ids = []
            unless cluster.cache_security_groups.empty?
              cluster.cache_security_groups.each { |cache_sg|
                cache_sg_ids << cache_sg.security_group_id
              }
            end

            deploy_struct = {
              "cache_node_type" => cluster.cache_node_type,
              "engine" => cluster.engine,
              "engine_version" => cluster.engine_version,
              "num_cache_nodes" => cluster.num_cache_nodes,
              "preferred_maintenance_window" => cluster.preferred_maintenance_window,
              "notification_configuration" => cluster.notification_configuration,
              "cache_security_groups" => cache_sg_ids,
              "cache_parameter_group" => cluster.cache_parameter_group.cache_parameter_group_name,
              "cache_subnet_group_name" => cluster.cache_subnet_group_name,
              "cache_nodes" => cluster.cache_nodes,
              "auto_minor_version_upgrade" => cluster.auto_minor_version_upgrade,
              "vpc_security_groups" => vpc_sg_ids,
              "replication_group_id" => cluster.replication_group_id,
              "snapshot_retention_limit" => cluster.snapshot_retention_limit,
              "snapshot_window" => cluster.snapshot_window              
            }
            if !cluster.configuration_endpoint.nil?
              deploy_struct["configuration_endpoint_address"] = cluster.configuration_endpoint.address
              deploy_struct["configuration_endpoint_port"] = cluster.configuration_endpoint.port
            end
          end

          return deploy_struct
        end

        # Generate a snapshot from the Cache Cluster described in this instance.
        # @return [String]: The cloud provider's identifier for the snapshot.
        def createNewSnapshot
          snap_id = @deploy.getResourceName(@config["name"]) + Time.new.strftime("%M%S").to_s

          attempts = 0
          begin
           MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).create_snapshot(
              cache_cluster_id: @config["identifier"],
              snapshot_name: snap_id
            )
          rescue Aws::ElastiCache::Errors::InvalidCacheClusterState => e
            if attempts < 10
              MU.log "Tried to create snapshot for #{@config["identifier"]} but cache cluster is busy, retrying a few times"
              attempts += 1
              sleep 30
              retry
            else
              raise MuError, "Failed to create snpashot for cache cluster #{@config["identifier"]}: #{e.inspect}"
            end
          end
          
          attempts = 0
          loop do
            MU.log "Waiting for snapshot of cache cluster  #{@config["identifier"]} to be ready...", MU::NOTICE if attempts % 20 == 0
            MU.log "Waiting for snapshot of cache cluster #{@config["identifier"]} to be ready...", MU::DEBUG

            snapshot_resp = MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).describe_snapshots(snapshot_name: snap_id)
            attempts += 1
            break unless snapshot_resp.snapshots.first.snapshot_status != "available"
            sleep 15
          end

          return snap_id
        end

        # @return [String]: The cloud provider's identifier for the snapshot.
        def getExistingSnapshot
          MU::Cloud::AWS.elasticache(region: @region, credentials: @credentials).describe_snapshots(snapshot_name: @config["identifier"]).snapshots.first.snapshot_name
        rescue NoMethodError
          raise MuError, "Snapshot #{@config["identifier"]} doesn't exist, make sure you provided a valid snapshot ID/Name"
        end

        # Does this resource type exist as a global (cloud-wide) artifact, or
        # is it localized to a region/zone?
        # @return [Boolean]
        def self.isGlobal?
          false
        end

        # Denote whether this resource implementation is experiment, ready for
        # testing, or ready for production use.
        def self.quality
          MU::Cloud::RELEASE
        end

        # Called by {MU::Cleanup}. Locates resources that were created by the currently-loaded deployment and purges them.
        # @param noop [Boolean]: If true, will only print what would be done.
        # @param ignoremaster [Boolean]: If true, will remove resources not flagged as originating from this Mu server.
        # @param region [String]: The cloud provider's region in which to operate.
        # @return [void]
        def self.cleanup(noop: false, deploy_id: MU.deploy_id, ignoremaster: false, credentials: nil, region: MU.curRegion, flags: {})
          skipsnapshots = flags["skipsnapshots"]
          all_clusters = MU::Cloud::AWS.elasticache(credentials: credentials, region: region).describe_cache_clusters
          our_clusters = []
          our_replication_group_ids = []

          # Because we can't run list_tags_for_resource on a cache cluster that isn't in "available" state we're loading the deploy to make sure we have a cache cluster to cleanup.
          # To ensure we don't miss cache clusters that have been terminated mid creation we'll load the 'original_config'. We might want to find a better approach for this.
          deploy = MU::MommaCat.getLitter(deploy_id)
          if deploy.original_config && deploy.original_config.has_key?("cache_clusters") && !deploy.original_config["cache_clusters"].empty?

            # The ElastiCache API and documentation are a mess, the replication group ARN resource_type is not documented, and is not easily guessable.
            # So instead of searching for replication groups directly we'll get their IDs from the cache clusters.
            all_clusters.cache_clusters.each { |cluster|
              cluster_id = cluster.cache_cluster_id

              # ElastiCache API is buggy... It will throw a CacheClusterNotFound if we run list_tags_for_resource on a cahe cluster that isn't in an "available" state.
              if cluster.cache_cluster_status != "available"
                if %w{deleting deleted}.include?(cluster.cache_cluster_status)
                  # The cahe cluster might not be ours so don't notify us about it.
                  next
                else
                  # We can replace this with an AWS waiter.
                  loop do
                    MU.log "Waiting for #{cluster_id} to be in a removable state", MU::NOTICE
                    cluster = MU::Cloud::AWS::CacheCluster.getCacheClusterById(cluster_id, region: region)
                    break if !cluster
                    break unless %w{creating modifying backing-up}.include?(cluster.cache_cluster_status)
                    sleep 60
                  end
                end
              end

              arn = MU::Cloud::AWS::CacheCluster.getARN(cluster_id, "cluster", "elasticache", region: region, credentials: credentials)
              attempts = 0

              begin
                tags = MU::Cloud::AWS.elasticache(credentials: credentials, region: region).list_tags_for_resource(resource_name: arn).tag_list
              rescue Aws::ElastiCache::Errors::CacheClusterNotFound => e
                if attempts < 5
                  MU.log "Can't get tags for #{cluster_id}, retrying a few times in case of a lagging resource", MU::WARN
                  MU.log "arn #{arn}", MU::WARN
                  attempts += 1
                  sleep 30
                  retry
                else
                  raise MuError, "Failed to get tags for cache cluster #{cluster_id}, MU-ID #{deploy_id}: #{e.inspect}"
                end
              end

              found_muid = false
              found_master = false
              tags.each { |tag|
                found_muid = true if tag.key == "MU-ID" && tag.value == deploy_id
                found_master = true if tag.key == "MU-MASTER-IP" && tag.value == MU.mu_public_ip
              }
              next if !found_muid

              delete =
                if ignoremaster && found_muid
                  true
                elsif !ignoremaster && found_muid && found_master
                  true
                else
                  false
                end

              if delete
                cluster.replication_group_id ? our_replication_group_ids << cluster.replication_group_id : our_clusters << cluster
              end
            }

            threads = []

            # Make sure we have only uniqe replication group IDs
            our_replication_group_ids = our_replication_group_ids.uniq
            if !our_replication_group_ids.empty?
              our_replication_group_ids.each { |group_id|
                replication_group = MU::Cloud::AWS::CacheCluster.getCacheReplicationGroupById(group_id, region: region)
                parent_thread_id = Thread.current.object_id
                threads << Thread.new(replication_group) { |myrepl_group|
                  MU.dupGlobals(parent_thread_id)
                  Thread.abort_on_exception = true
                  terminate_replication_group(myrepl_group, noop: noop, skipsnapshots: skipsnapshots, region: region, credentials: credentials)
                }
              }
            end

            # Hmmmm. Do we need to have seperate thread groups for clusters and replication groups?
            if !our_clusters.empty?
              our_clusters.each { |cluster|
                parent_thread_id = Thread.current.object_id
                threads << Thread.new(cluster) { |mycluster|
                  MU.dupGlobals(parent_thread_id)
                  Thread.abort_on_exception = true
                  terminate_cache_cluster(mycluster, noop: noop, skipsnapshots: skipsnapshots, region: region, credentials: credentials)
                }
              }
            end

            # Wait for all of the cache cluster  and replication groups to finish cleanup before proceeding
            threads.each { |t|
              t.join
            }
          end
        end

        # Cloud-specific configuration properties.
        # @param _config [MU::Config]: The calling MU::Config object
        # @return [Array<Array,Hash>]: List of required fields, and json-schema Hash of cloud-specific configuration parameters for this resource
        def self.schema(_config)
          toplevel_required = []
          schema = {
            "create_replication_group" => {
              "type" => "boolean",
              "description" => "Create a replication group; will be set automatically if +engine+ is +redis+ and +node_count+ is greated than one."
            },
            "ingress_rules" => MU::Cloud.resourceClass("AWS", "FirewallRule").ingressRuleAddtlSchema
          }
          [toplevel_required, schema]
        end

        # Cloud-specific pre-processing of {MU::Config::BasketofKittens::cache_clusters}, bare and unvalidated.
        # @param cache [Hash]: The resource to process and validate
        # @param configurator [MU::Config]: The overall deployment configurator of which this resource is a member
        # @return [Boolean]: True if validation succeeded, False otherwise
        def self.validateConfig(cache, configurator)
          ok = true

          if !cache['vpc']
            siblings = configurator.haveLitterMate?(nil, "vpcs", has_multiple: true)
            if siblings.size == 1
              MU.log "CacheCluster #{cache['name']} did not declare a VPC. Inserting into sibling VPC #{siblings[0]['name']}.", MU::WARN
              cache["vpc"] = {
                "name" => siblings[0]['name'],
                "subnet_pref" => "all_private"
              }
            elsif MU::Cloud::AWS.hosted? and MU::Cloud::AWS.myVPCObj
              cache["vpc"] = {
                "id" => MU.myVPC,
                "subnet_pref" => "all_private"
              }
            else
              MU.log "CacheCluster #{cache['name']} must declare a VPC", MU::ERR
              ok = false
            end

            # Re-insert ourselves with this modification so that our child
            # resources get this VPC we just shoved in
            if ok and cache['vpc']
              cache.delete("#MU_VALIDATED")
              return configurator.insertKitten(cache, "cache_clusters", overwrite: true)
            end
          end

          if cache.has_key?("parameter_group_parameters") && cache["parameter_group_family"].nil?
            MU.log "parameter_group_family must be set when setting parameter_group_parameters", MU::ERR
            ok = false
          end
          if cache["engine"] == "redis"
            # We aren't required to create a cache replication group for a single redis cache cluster,
            # however AWS console does exactly that, ss such we will follow that behavior.
            if cache["node_count"] > 1
              cache["create_replication_group"] = true
              cache["automatic_failover"] = cache["multi_az"]
            end

            # Some instance types don't support snapshotting
            if %w{cache.t2.micro cache.t2.small cache.t2.medium}.include?(cache["size"])
              if cache.has_key?("snapshot_retention_limit") || cache.has_key?("snapshot_window")
                MU.log "Can't set snapshot_retention_limit or snapshot_window on #{cache["size"]}", MU::ERR
                ok = false
              end
            end
          elsif cache["engine"] == "memcached"
            cache["create_replication_group"] = false

            if cache["node_count"] > 20
              MU.log "#{cache['engine']} supports up to 20 nodes per cache cluster", MU::ERR
              ok = false
            end

            # memcached doesn't support snapshots
            if cache.has_key?("snapshot_retention_limit") || cache.has_key?("snapshot_window")
              MU.log "Can't set snapshot_retention_limit or snapshot_window on #{cache["engine"]}", MU::ERR
              ok = false
            end
          end

          ok
        end

        private

        # Remove a Cache Cluster and associated artifacts
        # @param cluster [OpenStruct]: The cloud provider's description of the Cache Cluster artifact.
        # @param noop [Boolean]: If true, will only print what would be done.
        # @param skipsnapshots [Boolean]: If true, will not create a last snapshot before terminating the Cache Cluster.
        # @param region [String]: The cloud provider's region in which to operate.
        # @return [void]
        def self.terminate_cache_cluster(cluster, noop: false, skipsnapshots: false, region: MU.curRegion, credentials: nil)
          raise MuError, "terminate_cache_cluster requires a non-nil cache cluster descriptor" if cluster.nil? || cluster.empty?

          cluster_id = cluster.cache_cluster_id
          subnet_group = cluster.cache_subnet_group_name
          parameter_group = cluster.cache_parameter_group.cache_parameter_group_name

          # hmmmmm we can use an AWS waiter for this...
          unless cluster.cache_cluster_status == "available"
            loop do
              MU.log "Waiting for #{cluster_id} to be in a removable state...", MU::NOTICE
              cluster = MU::Cloud::AWS::CacheCluster.getCacheClusterById(cluster_id, region: region, credentials: credentials)
              break unless %w{creating modifying backing-up}.include?(cluster.cache_cluster_status)
              sleep 60
            end
          end

          # The API is broken, cluster.cache_nodes is returnning an empty array, and the only URL we can get is the config one with cluster.configuration_endpoint.address.
          # MU::Cloud.resourceClass("AWS", "DNSZone").genericMuDNSEntry(name: cluster_id, target: , cloudclass: MU::Cloud::CacheCluster, delete: true)
          
          if %w{deleting deleted}.include?(cluster.cache_cluster_status)
            MU.log "#{cluster_id} has already been terminated", MU::WARN
          else          
            unless noop
              def self.clusterSkipSnap(cluster_id, region, credentials)
                # We're calling this several times so lets declare it once
                MU.log "Terminating #{cluster_id}. Not saving final snapshot"
                MU::Cloud::AWS.elasticache(region: region, credentials: credentials).delete_cache_cluster(cache_cluster_id: cluster_id)
              end

              def self.clusterCreateSnap(cluster_id, region, credentials)
                MU.log "Terminating #{cluster_id}. Final snapshot name: #{cluster_id}-mufinal"
                MU::Cloud::AWS.elasticache(region: region, credentials: credentials).delete_cache_cluster(cache_cluster_id: cluster_id, final_snapshot_identifier: "#{cluster_id}-MUfinal")
              end

              retries = 0
              begin
                if cluster.engine == "memcached"
                  clusterSkipSnap(cluster_id, region, credentials)
                else
                  skipsnapshots ? clusterSkipSnap(cluster_id, region, credentials) : clusterCreateSnap(cluster_id, region, credentials)
                end
              rescue Aws::ElastiCache::Errors::InvalidCacheClusterState => e
                if retries < 5
                  MU.log "#{cluster_id} is not in a removable state, retrying several times", MU::WARN
                  retries += 1
                  sleep 30
                  retry
                else
                  MU.log "#{cluster_id} is not in a removable state after several retries, giving up. #{e.inspect}", MU::ERR
                  return
                end
              rescue Aws::ElastiCache::Errors::SnapshotAlreadyExistsFault
                MU.log "Snapshot #{cluster_id}-MUfinal already exists", MU::WARN
                clusterSkipSnap(cluster_id, region, credentials)
              rescue Aws::ElastiCache::Errors::SnapshotQuotaExceededFault
                MU.log "Snapshot quota exceeded while deleting #{cluster_id}", MU::ERR
                clusterSkipSnap(cluster_id, region, credentials)
              end

              wait_start_time = Time.now
              retries = 0
              begin
                MU::Cloud::AWS.elasticache(region: region, credentials: credentials).wait_until(:cache_cluster_deleted, cache_cluster_id: cluster_id) do |waiter|
                  waiter.max_attempts = nil
                  waiter.before_attempt do |attempts|
                    MU.log "Waiting for cache cluster #{cluster_id} to delete..", MU::NOTICE if attempts % 10 == 0
                  end
                  waiter.before_wait do |_attempts, resp|
                    throw :success if resp.cache_clusters.first.cache_cluster_status  == "deleted"
                    throw :failure if Time.now - wait_start_time > 1800
                  end
                end
              rescue Aws::Waiters::Errors::TooManyAttemptsError => e
                raise MuError, "Waited for #{(Time.now - wait_start_time).round/60*(retries+1)} minutes for cache cluster to delete, giving up. #{e}" if retries > 2
                wait_start_time = Time.now
                retries += 1
                retry
              rescue Aws::Waiters::Errors::FailureStateError
                MU.log "#{cluster_id} disappeared on us"
              end
            end
          end

          MU.log "#{cluster_id} has been terminated"

          unless noop
            delete_subnet_group(subnet_group, region: region, credentials: credentials) if subnet_group
            delete_parameter_group(parameter_group, region: region, credentials: credentials) if parameter_group && !parameter_group.start_with?("default")
          end
        end
        private_class_method :terminate_cache_cluster

        # Remove a Cache Cluster Replication Group and associated artifacts
        # @param repl_group [OpenStruct]: The cloud provider's description of the Cache Cluster artifact.
        # @param noop [Boolean]: If true, will only print what would be done.
        # @param skipsnapshots [Boolean]: If true, will not create a last snapshot before terminating the Cache Cluster.
        # @param region [String]: The cloud provider's region in which to operate.
        # @return [void]
        def self.terminate_replication_group(repl_group, noop: false, skipsnapshots: false, region: MU.curRegion, credentials: nil)
          raise MuError, "terminate_replication_group requires a non-nil cache replication group descriptor" if repl_group.nil? || repl_group.empty?

          repl_group_id = repl_group.replication_group_id
          # We're assuming that all clusters in this replication group where created in the same deployment so have the same subnet group, parameter group, etc...
          cluster_id = repl_group.member_clusters.first
          cluster = MU::Cloud::AWS::CacheCluster.getCacheClusterById(cluster_id, region: region)
          subnet_group = cluster.cache_subnet_group_name
          parameter_group = cluster.cache_parameter_group.cache_parameter_group_name

          # hmmmmm we can use an AWS waiter for this...
          unless repl_group.status == "available"
            loop do
              MU.log "Waiting for #{repl_group_id} to be in a removable state...", MU::NOTICE
              repl_group = MU::Cloud::AWS::CacheCluster.getCacheReplicationGroupById(repl_group_id, region: region)
              break unless %w{creating modifying backing-up}.include?(repl_group.status)
              sleep 60
            end
          end

          # What's the likelihood of having more than one node group? maybe iterate over node_groups instead of assuming there is only one?
          MU::Cloud.resourceClass("AWS", "DNSZone").genericMuDNSEntry(name: repl_group_id, target: repl_group.node_groups.first.primary_endpoint.address, cloudclass: MU::Cloud::CacheCluster, delete: true)
          # Assuming we also created DNS records for each of our cluster's read endpoint.
          repl_group.node_groups.first.node_group_members.each { |member|
            MU::Cloud.resourceClass("AWS", "DNSZone").genericMuDNSEntry(name: member.cache_cluster_id, target: member.read_endpoint.address, cloudclass: MU::Cloud::CacheCluster, delete: true)
          }
          
          if %w{deleting deleted}.include?(repl_group.status)
            MU.log "#{repl_group_id} has already been terminated", MU::WARN
          else
            unless noop
              def self.skipSnap(repl_group_id, region, credentials)
                # We're calling this several times so lets declare it once
                MU.log "Terminating #{repl_group_id}. Not saving final snapshot"
                MU::Cloud::AWS.elasticache(region: region, credentials: credentials).delete_replication_group(
                  replication_group_id: repl_group_id,
                  retain_primary_cluster: false
                )
              end

              def self.createSnap(repl_group_id, region, credentials)
                MU.log "Terminating #{repl_group_id}. Final snapshot name: #{repl_group_id}-mufinal"
                MU::Cloud::AWS.elasticache(region: region, credentials: credentials).delete_replication_group(
                  replication_group_id: repl_group_id,
                  retain_primary_cluster: false,
                  final_snapshot_identifier: "#{repl_group_id}-mufinal"
                )
              end

              retries = 0
              begin
                skipsnapshots ? skipSnap(repl_group_id, region, credentials) : createSnap(repl_group_id, region, credentials)
              rescue Aws::ElastiCache::Errors::InvalidReplicationGroupState => e
                if retries < 5
                  MU.log "#{repl_group_id} is not in a removable state, retrying several times", MU::WARN
                  retries += 1
                  sleep 30
                  retry
                else
                  MU.log "#{repl_group_id} is not in a removable state after several retries, giving up. #{e.inspect}", MU::ERR
                  return
                end
              rescue Aws::ElastiCache::Errors::SnapshotAlreadyExistsFault
                MU.log "Snapshot #{repl_group_id}-MUfinal already exists", MU::WARN
                skipSnap(repl_group_id, region, credentials)
              rescue Aws::ElastiCache::Errors::SnapshotQuotaExceededFault
                MU.log "Snapshot quota exceeded while deleting #{repl_group_id}", MU::ERR
                skipSnap(repl_group_id, region, credentials)
              end

              wait_start_time = Time.now
              retries = 0
              begin
                MU::Cloud::AWS.elasticache(region: region).wait_until(:replication_group_deleted, replication_group_id: repl_group_id) do |waiter|
                  waiter.max_attempts = nil
                  waiter.before_attempt do |attempts|
                    MU.log "Waiting for #{repl_group_id} to delete..", MU::NOTICE if attempts % 10 == 0
                  end
                  waiter.before_wait do |_attempts, resp|
                    throw :success if resp.replication_groups.first.status == "deleted"
                    throw :failure if Time.now - wait_start_time > 1800
                  end
                end
              rescue Aws::Waiters::Errors::TooManyAttemptsError => e
                raise MuError, "Waited for #{(Time.now - wait_start_time).round/60*(retries+1)} minutes for #{repl_group_id} to delete, giving up. #{e}" if retries > 2
                wait_start_time = Time.now
                retries += 1
                retry
              rescue Aws::Waiters::Errors::FailureStateError
                MU.log "#{repl_group_id} disappeared on us"
              end
            end
          end

          MU.log "#{repl_group_id} has been terminated"
          unless noop
            MU::Cloud::AWS::CacheCluster.delete_subnet_group(subnet_group, region: region) if subnet_group
            MU::Cloud::AWS::CacheCluster.delete_parameter_group(parameter_group, region: region) if parameter_group && !parameter_group.start_with?("default")
          end
        end
        private_class_method :terminate_replication_group

        # Remove a Cache Cluster Subnet Group.
        # @param subnet_group_id [string]: The cloud provider's ID of the cache cluster subnet group.
        # @param region [String]: The cloud provider's region in which to operate.
        # @return [void]
        def self.delete_subnet_group(subnet_group_id, region: MU.curRegion, credentials: nil)
          retries ||= 0
          MU.log "Deleting Subnet group #{subnet_group_id}"
          MU::Cloud::AWS.elasticache(region: region, credentials: credentials).delete_cache_subnet_group(cache_subnet_group_name: subnet_group_id)
        rescue Aws::ElastiCache::Errors::CacheSubnetGroupNotFoundFault
          MU.log "Subnet group #{subnet_group_id} disappeared before we could remove it", MU::WARN
        rescue Aws::ElastiCache::Errors::CacheSubnetGroupInUse => e
          if retries < 5
            MU.log "Subnet group #{subnet_group_id} is not in a removable state, retrying", MU::WARN
            retries += 1
            sleep 30
            retry
          else
            MU.log "Subnet group #{subnet_group_id} is not in a removable state after several retries, giving up. #{e.inspect}", MU::ERR
          end
        end
        private_class_method :delete_subnet_group

        # Remove a Cache Cluster Parameter Group.
        # @param parameter_group_id [string]: The cloud provider's ID of the cache cluster parameter group.
        # @param region [String]: The cloud provider's region in which to operate.
        # @return [void]
        def self.delete_parameter_group(parameter_group_id, region: MU.curRegion, credentials: nil)
          retries ||= 0
          MU.log "Deleting parameter group #{parameter_group_id}"
          MU::Cloud::AWS.elasticache(region: region, credentials: credentials).delete_cache_parameter_group(
            cache_parameter_group_name: parameter_group_id
          )
        rescue Aws::ElastiCache::Errors::CacheParameterGroupNotFound
          MU.log "Parameter group #{parameter_group_id} disappeared before we could remove it", MU::WARN
        rescue Aws::ElastiCache::Errors::InvalidCacheParameterGroupState => e
          if retries < 5
            MU.log "Parameter group #{parameter_group_id} is not in a removable state, retrying", MU::WARN
            retries += 1
            sleep 30
            retry
          else
            MU.log "Parameter group #{parameter_group_id} is not in a removable state after several retries, giving up. #{e.inspect}", MU::ERR
          end
        end
        private_class_method :delete_parameter_group
      end
    end
  end
end