nkeyes/philotic

View on GitHub
lib/philotic/connection.rb

Summary

Maintainability
A
35 mins
Test Coverage
require 'json'
require 'bunny'
require 'logger'
require 'bunny'
require 'i18n/core_ext/hash'
require 'philotic/constants'
require 'philotic/config'
require 'philotic/publisher'
require 'philotic/subscriber'

module Philotic
  class Connection

    class TCPConnectionFailed < StandardError
      attr_reader :url

      def initialize(message, url)
        super("Could not establish TCP connection to #{url}: #{message}")
      end
    end

    extend Forwardable

    attr_reader :connection, :connection_attempts
    attr_accessor :logger

    attr_writer :publisher, :subscriber

    def initialize
      @connection_attempts = 0
    end

    def publisher
      @publisher ||= Philotic::Publisher.new self
    end

    def subscriber
      @subscriber ||= Philotic::Subscriber.new self
    end

    def config
      @config ||= Philotic::Config.new self
    end

    def connect!
      return if connected?

      start_connection!

      if connected?
        logger.info { "Connected to RabbitMQ: #{config.sanitized_rabbit_url}" }
        set_exchange_return_handler!
        true
      else
        logger.error { "Failed to connect to RabbitMQ: #{config.sanitized_rabbit_url}" }
        false
      end
    end

    def start_connection!
      begin
        attempt_connection
      rescue ::Bunny::TCPConnectionFailed => e
        if connection_attempts < config.connection_attempts
          retry
        else
          attempts             = connection_attempts
          @connection_attempts = 0
          raise TCPConnectionFailed.new "Failed to connect to RabbitMQ server after #{attempts} attempts", config.sanitized_rabbit_url
        end
      end
    end

    def attempt_connection
      @connection_attempts += 1
      logger.warn { "Connecting to RabbitMQ: #{config.sanitized_rabbit_url}. Attempt #{connection_attempts} of #{config.connection_attempts}" } if connection_attempts > 1

      @connection = Bunny.new(config.rabbit_url, connection_settings)
      @connection.start
      @connection_attempts = 0
    end

    def connection_settings
      {
        automatically_recover: config.automatically_recover,
        network_recovery_interval: config.network_recovery_interval,
        continuation_timeout: config.continuation_timeout,
      }
    end

    def close
      logger.warn { "closing connection to RabbitMQ: #{config.sanitized_rabbit_url}" }
      connection.close if connected?
      @channel  = nil
      @exchange = nil
    end

    def connected?
      connection && connection.connected?
    end

    def channel
      @channel ||= connection.create_channel
    end

    def exchange
      @exchange ||= channel.send(config.exchange_type, config.exchange_name, durable: true)
    end

    def set_exchange_return_handler!
      exchange.on_return do |basic_return, metadata, payload|
        config.message_return_handler.call(basic_return, metadata, payload)
      end
    end

    def initialize_named_queue!(queue_name, config)
      raise RuntimeError.new 'Philotic.config.initialize_named_queues must be true to run Philotic.initialize_named_queue!' unless self.config.initialize_named_queues

      connect!
      queue_exists = connection.queue_exists? queue_name

      should_delete_queue = queue_exists && self.config.delete_existing_queues
      should_create_queue = !queue_exists || self.config.delete_existing_queues

      if should_delete_queue
        channel.queue(queue_name, passive: true).delete
        logger.info { "deleted old queue. queue: #{queue_name}" }
      end

      if should_create_queue
        config = config.deep_symbolize_keys
        queue  = queue_from_config(queue_name, config)
        bind_queue(queue, config)
      else
        logger.warn { "Queue #{queue_name} not created; it already exists. self.config.delete_existing_queues must be true to override." }
      end
    end

    def bind_queue(queue, config)
      queue_exchange = exchange_from_config(config)
      bindings       = config[:bindings]
      bindings.each do |arguments|
        queue.bind(queue_exchange, {arguments: arguments})
        logger.info { "Added binding to queue. queue: #{queue.name} binding: #{arguments}" }
      end

      logger.info { "Finished adding bindings to queue. queue: #{queue.name}" }
    end

    def exchange_from_config(config)
      config[:exchange] ? channel.send(self.config.exchange_type, config[:exchange], durable: true) : exchange
    end

    def queue_from_config(queue_name, config)
      queue_options = Philotic::DEFAULT_NAMED_QUEUE_OPTIONS.dup
      queue_options.merge!(config[:options] || {})

      channel.queue(queue_name, queue_options).tap do
        logger.info { "Created queue. queue:#{queue_name}" }
      end
    end

    def logger
      unless @logger
        @logger       = Logger.new(STDOUT)
        @logger.level = config.log_level
      end
      @logger
    end

    def_delegators :publisher, *(Philotic::Publisher.public_instance_methods(false) - [:connection, :config, :logger])
    def_delegators :subscriber, *(Philotic::Subscriber.public_instance_methods(false) - [:connection, :config, :logger])
  end
end