kayaman/klunk

View on GitHub
lib/klunk/topic.rb

Summary

Maintainability
A
0 mins
Test Coverage
module Klunk
  class Topic

    class << self

      def topics
        if File.exists?('config/topics.yml')
          ::YAML.load_file('config/topics.yml').map(&:deep_symbolize_keys)
        else
          []
        end
      end

      def name_for(topic_name, options = {})
        system_name = options[:system] || Klunk.configuration.prefix
        [system_name, ENV['EB_ENV'], topic_name]
        .compact.reject(&:blank?).join('_')
      end

      def create(topic_name)
        topic = client.create_topic(name: name_for(topic_name))
        puts "Topic created: #{topic.topic_arn}".cyan
        topic
      end

      def publish(topic_name, message)
        topic_arn = topic_arn(topic_name)
        puts "Publishing to #{topic_arn}: #{message}"
        client.publish(topic_arn: topic_arn(topic_name), message: message)
      end

      def topic_arn(topic_name, options = {})
        "arn:aws:sns:#{ENV['AWS_REGION']}:#{ENV['AWS_ACCOUNT_ID']}:#{name_for(topic_name, options)}"
      end

      def subscribe(queue_url, topic_arn, previous_policy = nil)
        queue_attributes = Klunk::Queue.get_attributes(queue_url)
        queue_arn = queue_attributes['QueueArn']
        subscription = client.subscribe(
        topic_arn: topic_arn, protocol: 'sqs', endpoint: queue_arn
        )
        client.set_subscription_attributes(
        subscription_arn: subscription.subscription_arn,
        attribute_name: 'RawMessageDelivery', attribute_value: 'true'
        )
        if queue_attributes.key?('Policy')
          previous_policy = JSON.parse(queue_attributes['Policy'])
        end
        add_policy(queue_url, topic_arn, previous_policy)
      end

      def describe(topic_name, options = {})
        puts topic_arn(topic_name, options)
        {
          topic: topic_arn(topic_name, options),
          subscriptions: client.list_subscriptions_by_topic(
          topic_arn: topic_arn(topic_name, options)
          ).subscriptions.map { |topic| topic[:endpoint] }
        }
      end

      def add_policy(queue_url, topic_arn, previous_policy)
        previous_policy ||= build_policy(queue_url, topic_arn)
        Queue.client.set_queue_attributes(
        queue_url: queue_url,
        attributes: {
          Policy: previous_policy.tap do |p|
            (p['Statement'] ||= []) << build_statement(queue_url, topic_arn)
            p['Statement'].uniq!
          end.to_json
        }
        )
      end

      def build_statement(queue_url, topic_arn)
        queue_arn = Klunk::Queue.get_attributes(queue_url)['QueueArn']
        queue_name = queue_arn.split(':').last
        topic_name = topic_arn.split(':').last
        {
          'Sid': "#{queue_name.camelize}_Send_#{topic_name.camelize}",
          'Effect': 'Allow',
          'Principal': { 'AWS': '*' },
          'Action': 'SQS:SendMessage',
          'Resource': queue_arn,
          'Condition': {
            'ArnEquals': { 'aws:SourceArn': topic_arn }
          }
        }
      end

      def build_policy(queue_url, topic_arn)
        queue_arn = Klunk::Queue.get_attributes(queue_url)['QueueArn']
        {
          'Version':  '2012-10-17',
          'Id': "#{queue_arn}/SQSDefaultPolicy",
          'Statement': [
            build_statement(queue_url, topic_arn)
          ]
        }
      end

      def client
        @client ||= Aws::SNS::Client.new
      end
    end
  end
end