phobos/phobos

View on GitHub
lib/phobos/listener.rb

Summary

Maintainability
A
1 hr
Test Coverage
# frozen_string_literal: true

module Phobos
  # rubocop:disable Metrics/ParameterLists, Metrics/ClassLength
  class Listener
    include Phobos::Instrumentation
    include Phobos::Log

    DEFAULT_MAX_BYTES_PER_PARTITION = 1_048_576 # 1 MB
    DELIVERY_OPTS = %w[batch message inline_batch].freeze

    # @return [String]
    attr_reader :group_id
    # @return [String]
    attr_reader :topic
    attr_reader :id
    # @return [Class<BasicObject>]
    attr_reader :handler_class
    attr_reader :encoding, :consumer

    # @param handler [Class<BasicObject>]
    # @param group_id [String]
    # @param topic [String]
    # @param min_bytes [Integer]
    # @param max_wait_time [Integer]
    # @param start_from_beginning [Boolean]
    # @param delivery [String]
    # @param max_bytes_per_partition [Integer]
    # @param session_timeout [Integer]
    # @param offset_commit_interval [Integer]
    # @param heartbeat_interval [Integer]
    # @param offset_commit_threshold [Integer]
    # @param offset_retention_time [Integer]
    # rubocop:disable Metrics/MethodLength
    def initialize(handler:, group_id:, topic:, min_bytes: nil, max_wait_time: nil,
                   force_encoding: nil, start_from_beginning: true, backoff: nil,
                   delivery: 'batch', max_bytes_per_partition: DEFAULT_MAX_BYTES_PER_PARTITION,
                   session_timeout: nil, offset_commit_interval: nil,
                   heartbeat_interval: nil, offset_commit_threshold: nil,
                   offset_retention_time: nil)
      @id = SecureRandom.hex[0...6]
      @handler_class = handler
      @group_id = group_id
      @topic = topic
      @backoff = backoff
      @delivery = delivery.to_s
      @subscribe_opts = {
        start_from_beginning: start_from_beginning, max_bytes_per_partition: max_bytes_per_partition
      }
      @kafka_consumer_opts = compact(
        session_timeout: session_timeout, offset_retention_time: offset_retention_time,
        offset_commit_interval: offset_commit_interval, heartbeat_interval: heartbeat_interval,
        offset_commit_threshold: offset_commit_threshold
      )
      @encoding = Encoding.const_get(force_encoding.to_sym) if force_encoding
      @message_processing_opts = compact(min_bytes: min_bytes, max_wait_time: max_wait_time)
      @kafka_client = Phobos.create_kafka_client(:consumer)
      @producer_enabled = @handler_class.ancestors.include?(Phobos::Producer)
    end
    # rubocop:enable Metrics/MethodLength

    # @return [void]
    def start
      @signal_to_stop = false
      start_listener

      begin
        start_consumer_loop
      rescue Kafka::ProcessingError, Phobos::AbortError => e
        # Abort is an exception to prevent the consumer from committing the offset.
        # Since "listener" had a message being retried while "stop" was called
        # it's wise to not commit the batch offset to avoid data loss. This will
        # cause some messages to be reprocessed
        instrument('listener.retry_aborted', listener_metadata) do
          log_info('Retry loop aborted, listener is shutting down', listener_metadata)
        end
        raise e if e.is_a?(Kafka::ProcessingError)
      end
    ensure
      stop_listener
    end

    # @return [void]
    def stop
      return if should_stop?

      instrument('listener.stopping', listener_metadata) do
        log_info('Listener stopping', listener_metadata)
        @consumer&.stop
        @signal_to_stop = true
      end
    end

    def create_exponential_backoff
      Phobos.create_exponential_backoff(@backoff)
    end

    def should_stop?
      @signal_to_stop == true
    end

    def send_heartbeat_if_necessary
      raise Phobos::AbortError if should_stop?

      @consumer&.send_heartbeat_if_necessary
    end

    private

    def listener_metadata
      { listener_id: id, group_id: group_id, topic: topic, handler: handler_class.to_s }
    end

    def start_listener
      instrument('listener.start', listener_metadata) do
        @consumer = create_kafka_consumer
        @consumer.subscribe(topic, **@subscribe_opts)

        # This is done here because the producer client is bound to the current thread and
        # since "start" blocks a thread might be used to call it
        @handler_class.producer.configure_kafka_client(@kafka_client) if @producer_enabled

        instrument('listener.start_handler', listener_metadata) do
          @handler_class.start(@kafka_client)
        end
        log_info('Listener started', listener_metadata)
      end
    end

    def stop_listener
      instrument('listener.stop', listener_metadata) do
        instrument('listener.stop_handler', listener_metadata) { @handler_class.stop }

        @consumer&.stop

        if @producer_enabled
          @handler_class.producer.async_producer_shutdown
          @handler_class.producer.configure_kafka_client(nil)
        end

        @kafka_client.close
        log_info('Listener stopped', listener_metadata) if should_stop?
      end
    end

    def start_consumer_loop
      # validate batch handling
      case @delivery
      when 'batch'
        consume_each_batch
      when 'inline_batch'
        consume_each_batch_inline
      else
        consume_each_message
      end
    end

    def consume_each_batch
      @consumer.each_batch(**@message_processing_opts) do |batch|
        batch_processor = Phobos::Actions::ProcessBatch.new(
          listener: self,
          batch: batch,
          listener_metadata: listener_metadata
        )

        batch_processor.execute
        log_debug('Committed offset', batch_processor.metadata)
        return nil if should_stop?
      end
    end

    def consume_each_batch_inline
      @consumer.each_batch(**@message_processing_opts) do |batch|
        batch_processor = Phobos::Actions::ProcessBatchInline.new(
          listener: self,
          batch: batch,
          metadata: listener_metadata
        )

        batch_processor.execute
        log_debug('Committed offset', batch_processor.metadata)
        return nil if should_stop?
      end
    end

    def consume_each_message
      @consumer.each_message(**@message_processing_opts) do |message|
        message_processor = Phobos::Actions::ProcessMessage.new(
          listener: self,
          message: message,
          listener_metadata: listener_metadata
        )

        message_processor.execute
        log_debug('Committed offset', message_processor.metadata)
        return nil if should_stop?
      end
    end

    def create_kafka_consumer
      configs = Phobos.config.consumer_hash.select do |k|
        Constants::KAFKA_CONSUMER_OPTS.include?(k)
      end
      configs.merge!(@kafka_consumer_opts)
      @kafka_client.consumer(**{ group_id: group_id }.merge(configs))
    end

    def compact(hash)
      hash.delete_if { |_, v| v.nil? }
    end
  end
  # rubocop:enable Metrics/ParameterLists, Metrics/ClassLength
end