bpot/poseidon

View on GitHub
lib/poseidon/partition_consumer.rb

Summary

Maintainability
B
4 hrs
Test Coverage
module Poseidon
  # A primitive Kafka Consumer which operates on a specific broker, topic and partition.
  #
  # Example in the README.
  #
  # @api public
  class PartitionConsumer
    # The offset of the latest message the broker recieved for this partition.
    # Useful for knowning how far behind the consumer is. This value is only
    # as recent as the last fetch call.
    attr_reader :highwater_mark

    attr_reader :host, :port

    attr_reader :offset

    attr_reader :topic

    # Returns a consumer pointing at the lead broker for the partition.
    #
    # Eventually this will be replaced by higher level consumer functionality,
    # this is a stop-gap.
    #
    def self.consumer_for_partition(client_id, seed_brokers, topic, partition, offset, options = {})

      broker = BrokerPool.open(client_id, seed_brokers, options[:socket_timeout_ms] || 10_000) do |broker_pool|
        cluster_metadata = ClusterMetadata.new
        cluster_metadata.update(broker_pool.fetch_metadata([topic]))

        cluster_metadata.lead_broker_for_partition(topic, partition)
      end

      new(client_id, broker.host, broker.port, topic, partition, offset, options)
    end

    # Create a new consumer which reads the specified topic and partition from
    # the host.
    #
    # @param [String] client_id  Used to identify this client should be unique.
    # @param [String] host
    # @param [Integer] port 
    # @param [String] topic Topic to read from
    # @param [Integer] partition Partitions are zero indexed.
    # @param [Integer,Symbol] offset 
    #   Offset to start reading from. A negative offset can also be passed.
    #   There are a couple special offsets which can be passed as symbols:
    #     :earliest_offset       Start reading from the first offset the server has.
    #     :latest_offset         Start reading from the latest offset the server has.
    #
    # @param [Hash] options
    #   Theses options can all be overridden in each individual fetch command.
    #
    # @option options [Integer] :max_bytes
    #   Maximum number of bytes to fetch
    #   Default: 1048576 (1MB)
    #
    # @option options [Integer] :max_wait_ms
    #   How long to block until the server sends us data.
    #   NOTE: This is only enforced if min_bytes is > 0.
    #   Default: 100 (100ms)
    #
    # @option options [Integer] :min_bytes
    #   Smallest amount of data the server should send us.
    #   Default: 1 (Send us data as soon as it is ready)
    #
    # @option options [Integer] :socket_timeout_ms
    #   How long to wait for reply from server. Should be higher than max_wait_ms.
    #   Default: 10000 (10s)
    #
    # @api public
    def initialize(client_id, host, port, topic, partition, offset, options = {})
      @host = host
      @port = port

      handle_options(options)

      @connection = Connection.new(host, port, client_id, @socket_timeout_ms)
      @topic = topic
      @partition = partition
      if Symbol === offset
        raise ArgumentError, "Unknown special offset type: #{offset}" unless [:earliest_offset, :latest_offset].include?(offset)
      end
      @offset = offset
    end

    # Fetch messages from the broker.
    #
    # @param [Hash] options
    #
    # @option options [Integer] :max_bytes
    #   Maximum number of bytes to fetch
    #
    # @option options [Integer] :max_wait_ms
    #   How long to block until the server sends us data.
    #
    # @option options [Integer] :min_bytes
    #   Smallest amount of data the server should send us.
    #
    # @api public
    def fetch(options = {})
      fetch_max_wait = options.delete(:max_wait_ms) || max_wait_ms
      fetch_max_bytes = options.delete(:max_bytes) || max_bytes
      fetch_min_bytes = options.delete(:min_bytes) || min_bytes

      if options.keys.any?
        raise ArgumentError, "Unknown options: #{options.keys.inspect}"
      end

      topic_fetches = build_topic_fetch_request(fetch_max_bytes)
      fetch_response = @connection.fetch(fetch_max_wait, fetch_min_bytes, topic_fetches)
      topic_response = fetch_response.topic_fetch_responses.first 
      partition_response = topic_response.partition_fetch_responses.first

      unless partition_response.error == Errors::NO_ERROR_CODE
        if @offset < 0 &&
          Errors::ERROR_CODES[partition_response.error] == Errors::OffsetOutOfRange
          @offset = :earliest_offset
          return fetch(options)
        end

        raise Errors::ERROR_CODES[partition_response.error]
      else
        @highwater_mark = partition_response.highwater_mark_offset
        messages = partition_response.message_set.flatten.map do |m|
          FetchedMessage.new(topic_response.topic, m.value, m.key, m.offset)
        end
        if messages.any?
          @offset = messages.last.offset + 1
        end
        messages
      end
    end

    # @return [Integer] next offset we will fetch
    #
    # @api public
    def next_offset
      resolve_offset_if_necessary
      @offset
    end

    # Close the connection to the kafka broker
    #
    # @return [Nil]
    #
    # @api public
    def close
      @connection.close
      nil
    end

    private
    def handle_options(options)
      @max_bytes         = options.delete(:max_bytes) || 1024*1024
      @min_bytes         = options.delete(:min_bytes) || 1
      @max_wait_ms       = options.delete(:max_wait_ms) || 10_000
      @socket_timeout_ms = options.delete(:socket_timeout_ms) || @max_wait_ms + 10_000

      if @socket_timeout_ms < @max_wait_ms
        raise ArgumentError, "Setting socket_timeout_ms should be higher than max_wait_ms"
      end

      if options.keys.any?
        raise ArgumentError, "Unknown options: #{options.keys.inspect}"
      end
    end

    def max_wait_ms
      @max_wait_ms
    end

    def max_bytes
      @max_bytes
    end

    def min_bytes
      @min_bytes
    end

    def resolve_offset_if_necessary
      return unless Symbol === @offset || @offset < 0

      protocol_offset = case @offset
      when :earliest_offset
        -2
      when :latest_offset
        -1
      else
        -1
      end

      topic_offset_responses = @connection.offset(build_topic_offset_request(protocol_offset))
      partition_offsets = topic_offset_responses.first.partition_offsets
      if partition_offsets.first.error != Errors::NO_ERROR_CODE
        raise Errors::ERROR_CODES[partition_offsets.first.error]
      end

      offset_struct = partition_offsets.first.offsets.first

      @offset = if offset_struct.nil?
        0
      elsif @offset.kind_of?(Fixnum) && @offset < 0
        offset_struct.offset + @offset
      else
        offset_struct.offset
      end
    end

    def build_topic_offset_request(protocol_offset)
      partition_offset_request = Protocol::PartitionOffsetRequest.new(
        @partition,
        protocol_offset,
        max_number_of_offsets = 1)
        
      [Protocol::TopicOffsetRequest.new(topic, [partition_offset_request])]
    end

    def build_topic_fetch_request(max_bytes)
      partition_fetches = [Protocol::PartitionFetch.new(@partition,
                                                        next_offset,
                                                        max_bytes)]
      topic_fetches = [Protocol::TopicFetch.new(topic, partition_fetches)]
    end
  end
end