nkeyes/philotic

View on GitHub
lib/philotic/publisher.rb

Summary

Maintainability
A
0 mins
Test Coverage
require 'philotic/constants'
require 'philotic/serialization'

module Philotic
  class Publisher

    attr_accessor :connection
    attr_accessor :log_message_handler

    def initialize(connection)
      @connection = connection
    end

    def logger
      connection.logger
    end

    def config
      connection.config
    end

    def publish(message)
      metadata = {headers: message.headers}
      metadata.merge!(message.metadata) if message.metadata
      begin
        message.published = _publish(message.payload, metadata)
      rescue => e
        message.publish_error = e
        logger.error e.message
        raise e if config.raise_error_on_publish
      end
      message
    end

    def normalize_payload_times(payload)
      payload.each do |k, v|
        if v.respond_to?(:utc)
          payload[k] = v.utc
        elsif v.respond_to?(:to_utc)
          payload[k] = v.to_utc
        end
      end
    end

    private
    def _publish(payload, metadata = {})
      if config.disable_publish
        log_message_published(:warn, metadata, payload, 'attempted to publish a message when publishing is disabled.')
        return false
      end
      connection.connect!
      unless connection.connected?
        log_message_published(:error, metadata, payload, 'unable to publish message, not connected to RabbitMQ')
        return false
      end
      metadata = merge_metadata(metadata)

      payload = normalize_payload_times(payload)

      connection.exchange.publish(Philotic::Serialization::Serializer.dump(payload, metadata), metadata)
      log_message_published(:debug, metadata, payload, 'published message')
      true
    end

    def merge_metadata(metadata)
      publish_defaults = {}
      Philotic::MESSAGE_OPTIONS.each do |key|
        publish_defaults[key] = config.send(key.to_s)
      end
      metadata           = publish_defaults.merge metadata
      metadata[:headers] ||= {}
      metadata[:headers] = {philotic_firehose: true}.merge(metadata[:headers])
      metadata
    end

    def on_publish_message(&block)
      @log_message_handler = block
    end

    def log_message_published(severity, metadata, payload, message)
      if @log_message_handler
        @log_message_handler.call(severity, metadata, payload, message)
      else
        logger.send(severity, "#{message}; metadata:#{metadata}, payload:#{payload.to_json}")
      end
    end
  end
end