ManageIQ/manageiq-messaging

View on GitHub
lib/manageiq/messaging/kafka/client.rb

Summary

Maintainability
A
55 mins
Test Coverage
B
80%
module ManageIQ
  module Messaging
    module Kafka
      # Messaging client implementation with Kafka being the underlying supporting system.
      # Do not directly instantiate an instance from this class. Use
      # +ManageIQ::Messaging::Client.open+ method.
      #
      # Kafka specific connection options accepted by +open+ method:
      # * :client_ref (A reference string to identify the client)
      # * :hosts (Array of Kafka cluster hosts, or)
      #
      # For additional security options, please refer to
      # https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka and
      # https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka
      #
      #
      # Kafka specific +publish_message+ options:
      # * :group_name (Used as Kafka partition_key)
      #
      # Kafka specific +subscribe_topic+ options:
      # * :persist_ref (Used as Kafka group_id)
      # * :session_timeout (Max time in seconds allowed to process a message, default is 30)
      #
      # Kafka specific +subscribe_messages+ options:
      # * :max_bytes (Max batch size to read, default is 10Mb)
      # * :session_timeout (Max time in seconds allowed to process a message, default is 30)
      #
      # Without +:persist_ref+ every topic subscriber receives a copy of each message
      # only when they are active. If multiple topic subscribers join with the same
      # +:persist_ref+, each message is consumed by only one of the subscribers. This
      # allows a load balancing among the subscribers. Also any messages sent when
      # all members of the +:persist_ref+ group are offline will be persisted and delivered
      # when any member in the group is back online. Each message is still copied and
      # delivered to other subscribers that belongs to other +:persist_ref+ groups or no group.
      #
      # +subscribe_background_job+ is currently not implemented.
      class Client < ManageIQ::Messaging::Client
        require 'rdkafka'
        require 'manageiq/messaging/kafka/common'
        require 'manageiq/messaging/kafka/queue'
        require 'manageiq/messaging/kafka/background_job'
        require 'manageiq/messaging/kafka/topic'

        include Common
        include Queue
        include BackgroundJob
        include Topic

        attr_accessor :encoding

        def ack(ack_ref)
          ack_ref.commit
        rescue Rdkafka::RdkafkaError => e
          logger.warn("ack failed with error #{e.message}")
          raise unless e.message =~ /no_offset/
        end

        def close
          @admin&.close
          @admin = nil

          @producer&.close
          @producer = nil

          @consumer&.close
          @consumer = nil
        end

        # list all topics
        def topics
          admin.metadata.topics.map { |topic| topic[:topic_name] }
        end

        private

        attr_reader :kafka_client

        def initialize(options)
          @encoding = options[:encoding] || 'yaml'
          require "json" if @encoding == "json"

          ::Rdkafka::Config.logger = logger
          @kafka_client = ::Rdkafka::Config.new(rdkafka_connection_opts(options))
        end

        def rdkafka_connection_opts(options)
          hosts = Array(options[:hosts] || options[:host])
          hosts.collect! { |host| "#{host}:#{options[:port]}" }

          result = {:"bootstrap.servers" => hosts.join(',')}
          result[:"client.id"] = options[:client_ref] if options[:client_ref]

          result[:"sasl.mechanism"]    = options[:sasl_mechanism] || "PLAIN"
          result[:"sasl.username"]     = options[:username] if options[:username]
          result[:"sasl.password"]     = options[:password] if options[:password]
          result[:"security.protocol"] = !!options[:ssl] ? "SASL_SSL" : "PLAINTEXT"
          result[:"ssl.ca.location"]   = options[:ca_file] if options[:ca_file]
          result[:"ssl.keystore.location"] = options[:keystore_location] if options[:keystore_location]
          result[:"ssl.keystore.password"] = options[:keystore_password] if options[:keystore_password]

          result.merge(options.except(:port, :host, :hosts, :encoding, :protocol, :client_ref, :sasl_mechanism, :username, :password, :ssl, :ca_file, :keystore_location, :keystore_password))
        end
      end
    end
  end
end