karafka/karafka

View on GitHub
lib/karafka/admin.rb

Summary

Maintainability
A
0 mins
Test Coverage
# frozen_string_literal: true

module Karafka
  # Admin actions that we can perform via Karafka on our Kafka cluster
  #
  # @note It always initializes a new admin instance as we want to ensure it is always closed
  #   Since admin actions are not performed that often, that should be ok.
  #
  # @note It always uses the primary defined cluster and does not support multi-cluster work.
  #   Cluster on which operations are performed can be changed via `admin.kafka` config, however
  #   there is no multi-cluster runtime support.
  module Admin
    # More or less number of seconds of 1 hundred years
    # Used for time referencing that does not have to be accurate but needs to be big
    HUNDRED_YEARS = 100 * 365.25 * 24 * 60 * 60

    private_constant :HUNDRED_YEARS

    class << self
      # Allows us to read messages from the topic
      #
      # @param name [String, Symbol] topic name
      # @param partition [Integer] partition
      # @param count [Integer] how many messages we want to get at most
      # @param start_offset [Integer, Time] offset from which we should start. If -1 is provided
      #   (default) we will start from the latest offset. If time is provided, the appropriate
      #   offset will be resolved. If negative beyond -1 is provided, we move backwards more.
      # @param settings [Hash] kafka extra settings (optional)
      #
      # @return [Array<Karafka::Messages::Message>] array with messages
      def read_topic(name, partition, count, start_offset = -1, settings = {})
        messages = []
        tpl = Rdkafka::Consumer::TopicPartitionList.new
        low_offset, high_offset = nil

        with_consumer(settings) do |consumer|
          # Convert the time offset (if needed)
          start_offset = resolve_offset(consumer, name.to_s, partition, start_offset)

          low_offset, high_offset = consumer.query_watermark_offsets(name, partition)

          # Select offset dynamically if -1 or less and move backwards with the negative
          # offset, allowing to start from N messages back from high-watermark
          start_offset = high_offset - count - start_offset.abs + 1 if start_offset.negative?
          start_offset = low_offset if start_offset.negative?

          # Build the requested range - since first element is on the start offset we need to
          # subtract one from requested count to end up with expected number of elements
          requested_range = (start_offset..start_offset + (count - 1))
          # Establish theoretical available range. Note, that this does not handle cases related to
          # log retention or compaction
          available_range = (low_offset..(high_offset - 1))
          # Select only offset that we can select. This will remove all the potential offsets that
          # are below the low watermark offset
          possible_range = requested_range.select { |offset| available_range.include?(offset) }

          start_offset = possible_range.first
          count = possible_range.count

          tpl.add_topic_and_partitions_with_offsets(name, partition => start_offset)
          consumer.assign(tpl)

          # We should poll as long as we don't have all the messages that we need or as long as
          # we do not read all the messages from the topic
          loop do
            # If we've got as many messages as we've wanted stop
            break if messages.size >= count

            message = consumer.poll(200)

            next unless message

            # If the message we've got is beyond the requested range, stop
            break unless possible_range.include?(message.offset)

            messages << message
          rescue Rdkafka::RdkafkaError => e
            # End of partition
            break if e.code == :partition_eof

            raise e
          end
        end

        # Use topic from routes if we can match it or create a dummy one
        # Dummy one is used in case we cannot match the topic with routes. This can happen
        # when admin API is used to read topics that are not part of the routing
        topic = ::Karafka::Routing::Router.find_or_initialize_by_name(name)

        messages.map! do |message|
          Messages::Builders::Message.call(
            message,
            topic,
            Time.now
          )
        end
      end

      # Creates Kafka topic with given settings
      #
      # @param name [String] topic name
      # @param partitions [Integer] number of partitions we expect
      # @param replication_factor [Integer] number of replicas
      # @param topic_config [Hash] topic config details as described here:
      #   https://kafka.apache.org/documentation/#topicconfigs
      def create_topic(name, partitions, replication_factor, topic_config = {})
        with_admin do |admin|
          handler = admin.create_topic(name, partitions, replication_factor, topic_config)

          with_re_wait(
            -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) },
            -> { topics_names.include?(name) }
          )
        end
      end

      # Deleted a given topic
      #
      # @param name [String] topic name
      def delete_topic(name)
        with_admin do |admin|
          handler = admin.delete_topic(name)

          with_re_wait(
            -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) },
            -> { !topics_names.include?(name) }
          )
        end
      end

      # Creates more partitions for a given topic
      #
      # @param name [String] topic name
      # @param partitions [Integer] total number of partitions we expect to end up with
      def create_partitions(name, partitions)
        with_admin do |admin|
          handler = admin.create_partitions(name, partitions)

          with_re_wait(
            -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) },
            -> { topic_info(name).fetch(:partition_count) >= partitions }
          )
        end
      end

      # Moves the offset on a given consumer group and provided topic to the requested location
      #
      # @param consumer_group_id [String] id of the consumer group for which we want to move the
      #   existing offset
      # @param topics_with_partitions_and_offsets [Hash] Hash with list of topics and settings to
      #   where to move given consumer. It allows us to move particular partitions or whole topics
      #   if we want to reset all partitions to for example a point in time.
      #
      # @note This method should **not** be executed on a running consumer group as it creates a
      #   "fake" consumer and uses it to move offsets.
      #
      # @example Move a single topic partition nr 1 offset to 100
      #   Karafka::Admin.seek_consumer_group('group-id', { 'topic' => { 1 => 100 } })
      #
      # @example Move offsets on all partitions of a topic to 100
      #   Karafka::Admin.seek_consumer_group('group-id', { 'topic' => 100 })
      #
      # @example Move offset to 5 seconds ago on partition 2
      #   Karafka::Admin.seek_consumer_group('group-id', { 'topic' => { 2 => 5.seconds.ago } })
      #
      # @example Move to the earliest offset on all the partitions of a topic
      #   Karafka::Admin.seek_consumer_group('group-id', { 'topic' => 'earliest' })
      #
      # @example Move to the latest (high-watermark) offset on all the partitions of a topic
      #   Karafka::Admin.seek_consumer_group('group-id', { 'topic' => 'latest' })
      #
      # @example Move offset of a single partition to earliest
      #   Karafka::Admin.seek_consumer_group('group-id', { 'topic' => { 1 => 'earliest' } })
      #
      # @example Move offset of a single partition to latest
      #   Karafka::Admin.seek_consumer_group('group-id', { 'topic' => { 1 => 'latest' } })
      def seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets)
        tpl_base = {}

        # Normalize the data so we always have all partitions and topics in the same format
        # That is in a format where we have topics and all partitions with their per partition
        # assigned offsets
        topics_with_partitions_and_offsets.each do |topic, partitions_with_offsets|
          tpl_base[topic] = {}

          if partitions_with_offsets.is_a?(Hash)
            tpl_base[topic] = partitions_with_offsets
          else
            topic_info(topic)[:partition_count].times do |partition|
              tpl_base[topic][partition] = partitions_with_offsets
            end
          end
        end

        tpl_base.each_value do |partitions|
          partitions.transform_values! do |position|
            # Support both symbol and string based references
            casted_position = position.is_a?(Symbol) ? position.to_s : position

            # This remap allows us to transform some special cases in a reference that can be
            # understood by Kafka
            case casted_position
            # Earliest is not always 0. When compacting/deleting it can be much later, that's why
            # we fetch the oldest possible offset
            when 'earliest'
              Time.now - HUNDRED_YEARS
            # Latest will always be the high-watermark offset and we can get it just by getting
            # a future position
            when 'latest'
              Time.now + HUNDRED_YEARS
            # Same as `'latest'`
            when false
              Time.now - HUNDRED_YEARS
            # Regular offset case
            else
              position
            end
          end
        end

        tpl = Rdkafka::Consumer::TopicPartitionList.new
        # In case of time based location, we need to to a pre-resolution, that's why we keep it
        # separately
        time_tpl = Rdkafka::Consumer::TopicPartitionList.new

        # Distribute properly the offset type
        tpl_base.each do |topic, partitions_with_offsets|
          partitions_with_offsets.each do |partition, offset|
            target = offset.is_a?(Time) ? time_tpl : tpl
            # We reverse and uniq to make sure that potentially duplicated references are removed
            # in such a way that the newest stays
            target.to_h[topic] ||= []
            target.to_h[topic] << Rdkafka::Consumer::Partition.new(partition, offset)
            target.to_h[topic].reverse!
            target.to_h[topic].uniq!(&:partition)
            target.to_h[topic].reverse!
          end
        end

        settings = { 'group.id': consumer_group_id }

        with_consumer(settings) do |consumer|
          # If we have any time based stuff to resolve, we need to do it prior to commits
          unless time_tpl.empty?
            real_offsets = consumer.offsets_for_times(time_tpl)

            real_offsets.to_h.each do |name, results|
              results.each do |result|
                raise(Errors::InvalidTimeBasedOffsetError) unless result

                partition = result.partition

                # Negative offset means we're beyond last message and we need to query for the
                # high watermark offset to get the most recent offset and move there
                if result.offset.negative?
                  _, offset = consumer.query_watermark_offsets(name, result.partition)
                else
                  # If we get an offset, it means there existed a message close to this time
                  # location
                  offset = result.offset
                end

                # Since now we have proper offsets, we can add this to the final tpl for commit
                tpl.to_h[name] ||= []
                tpl.to_h[name] << Rdkafka::Consumer::Partition.new(partition, offset)
                tpl.to_h[name].reverse!
                tpl.to_h[name].uniq!(&:partition)
                tpl.to_h[name].reverse!
              end
            end
          end

          consumer.commit_offsets(tpl, async: false)
        end
      end

      # Removes given consumer group (if exists)
      #
      # @param consumer_group_id [String] consumer group name
      #
      # @note This method should not be used on a running consumer group as it will not yield any
      #   results.
      def delete_consumer_group(consumer_group_id)
        with_admin do |admin|
          handler = admin.delete_group(consumer_group_id)
          handler.wait(max_wait_timeout: app_config.admin.max_wait_time)
        end
      end

      # Fetches the watermark offsets for a given topic partition
      #
      # @param name [String, Symbol] topic name
      # @param partition [Integer] partition
      # @return [Array<Integer, Integer>] low watermark offset and high watermark offset
      def read_watermark_offsets(name, partition)
        with_consumer do |consumer|
          consumer.query_watermark_offsets(name, partition)
        end
      end

      # Reads lags and offsets for given topics in the context of consumer groups defined in the
      #   routing
      # @param consumer_groups_with_topics [Hash<String, Array<String>>] hash with consumer groups
      #   names with array of topics to query per consumer group inside
      # @param active_topics_only [Boolean] if set to false, when we use routing topics, will
      #   select also topics that are marked as inactive in routing
      # @return [Hash<String, Hash<Integer, <Hash<Integer>>>>] hash where the top level keys are
      #   the consumer groups and values are hashes with topics and inside partitions with lags
      #   and offsets
      #
      # @note For topics that do not exist, topic details will be set to an empty hash
      #
      # @note For topics that exist but were never consumed by a given CG we set `-1` as lag and
      #   the offset on each of the partitions that were not consumed.
      #
      # @note This lag reporting is for committed lags and is "Kafka-centric", meaning that this
      #   represents lags from Kafka perspective and not the consumer. They may differ.
      def read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true)
        # We first fetch all the topics with partitions count that exist in the cluster so we
        # do not query for topics that do not exist and so we can get partitions count for all
        # the topics we may need. The non-existent and not consumed will be filled at the end
        existing_topics = cluster_info.topics.map do |topic|
          [topic[:topic_name], topic[:partition_count]]
        end.to_h.freeze

        # If no expected CGs, we use all from routing that have active topics
        if consumer_groups_with_topics.empty?
          consumer_groups_with_topics = Karafka::App.routes.map do |cg|
            cg_topics = cg.topics.select do |cg_topic|
              active_topics_only ? cg_topic.active? : true
            end

            [cg.id, cg_topics.map(&:name)]
          end.to_h
        end

        # We make a copy because we will remove once with non-existing topics
        # We keep original requested consumer groups with topics for later backfilling
        cgs_with_topics = consumer_groups_with_topics.dup
        cgs_with_topics.transform_values!(&:dup)

        # We can query only topics that do exist, this is why we are cleaning those that do not
        # exist
        cgs_with_topics.each_value do |requested_topics|
          requested_topics.delete_if { |topic| !existing_topics.include?(topic) }
        end

        groups_lags = Hash.new { |h, k| h[k] = {} }
        groups_offs = Hash.new { |h, k| h[k] = {} }

        cgs_with_topics.each do |cg, topics|
          # Do not add to tpl topics that do not exist
          next if topics.empty?

          tpl = Rdkafka::Consumer::TopicPartitionList.new

          with_consumer('group.id': cg) do |consumer|
            topics.each { |topic| tpl.add_topic(topic, existing_topics[topic]) }

            commit_offsets = consumer.committed(tpl)

            commit_offsets.to_h.each do |topic, partitions|
              groups_offs[cg][topic] = {}

              partitions.each do |partition|
                # -1 when no offset is stored
                groups_offs[cg][topic][partition.partition] = partition.offset || -1
              end
            end

            consumer.lag(commit_offsets).each do |topic, partitions_lags|
              groups_lags[cg][topic] = partitions_lags
            end
          end
        end

        consumer_groups_with_topics.each do |cg, topics|
          groups_lags[cg]

          topics.each do |topic|
            groups_lags[cg][topic] ||= {}

            next unless existing_topics.key?(topic)

            # We backfill because there is a case where our consumer group would consume for
            # example only one partition out of 20, rest needs to get -1
            existing_topics[topic].times do |partition_id|
              groups_lags[cg][topic][partition_id] ||= -1
            end
          end
        end

        merged = Hash.new { |h, k| h[k] = {} }

        groups_lags.each do |cg, topics|
          topics.each do |topic, partitions|
            merged[cg][topic] = {}

            partitions.each do |partition, lag|
              merged[cg][topic][partition] = {
                offset: groups_offs.fetch(cg).fetch(topic).fetch(partition),
                lag: lag
              }
            end
          end
        end

        merged
      end

      # @return [Rdkafka::Metadata] cluster metadata info
      def cluster_info
        with_admin(&:metadata)
      end

      # Returns basic topic metadata
      #
      # @param topic_name [String] name of the topic we're interested in
      # @return [Hash] topic metadata info hash
      # @raise [Rdkafka::RdkafkaError] `unknown_topic_or_part` if requested topic is not found
      #
      # @note This query is much more efficient than doing a full `#cluster_info` + topic lookup
      #   because it does not have to query for all the topics data but just the topic we're
      #   interested in
      def topic_info(topic_name)
        with_admin do |admin|
          admin
            .metadata(topic_name)
            .topics
            .find { |topic| topic[:topic_name] == topic_name }
        end
      end

      # Creates consumer instance and yields it. After usage it closes the consumer instance
      # This API can be used in other pieces of code and allows for low-level consumer usage
      #
      # @param settings [Hash] extra settings to customize consumer
      #
      # @note We always ship and yield a proxied consumer because admin API performance is not
      #   that relevant. That is, there are no high frequency calls that would have to be delegated
      def with_consumer(settings = {})
        bind_id = SecureRandom.uuid

        consumer = config(:consumer, settings).consumer(native_kafka_auto_start: false)
        bind_oauth(bind_id, consumer)

        consumer.start
        proxy = ::Karafka::Connection::Proxy.new(consumer)
        yield(proxy)
      ensure
        # Always unsubscribe consumer just to be sure, that no metadata requests are running
        # when we close the consumer. This in theory should prevent from some race-conditions
        # that originate from librdkafka
        begin
          consumer&.unsubscribe
        # Ignore any errors and continue to close consumer despite them
        rescue Rdkafka::RdkafkaError
          nil
        end

        consumer&.close

        unbind_oauth(bind_id)
      end

      # Creates admin instance and yields it. After usage it closes the admin instance
      def with_admin
        bind_id = SecureRandom.uuid

        admin = config(:producer, {}).admin(native_kafka_auto_start: false)
        bind_oauth(bind_id, admin)

        admin.start
        proxy = ::Karafka::Connection::Proxy.new(admin)
        yield(proxy)
      ensure
        admin&.close

        unbind_oauth(bind_id)
      end

      private

      # Adds a new callback for given rdkafka instance for oauth token refresh (if needed)
      #
      # @param id [String, Symbol] unique (for the lifetime of instance) id that we use for
      #   callback referencing
      # @param instance [Rdkafka::Consumer, Rdkafka::Admin] rdkafka instance to be used to set
      #   appropriate oauth token when needed
      def bind_oauth(id, instance)
        ::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.add(
          id,
          Instrumentation::Callbacks::OauthbearerTokenRefresh.new(
            instance
          )
        )
      end

      # Removes the callback from no longer used instance
      #
      # @param id [String, Symbol] unique (for the lifetime of instance) id that we use for
      #   callback referencing
      def unbind_oauth(id)
        ::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.delete(id)
      end

      # @return [Array<String>] topics names
      def topics_names
        cluster_info.topics.map { |topic| topic.fetch(:topic_name) }
      end

      # There are some cases where rdkafka admin operations finish successfully but without the
      # callback being triggered to materialize the post-promise object. Until this is fixed we
      # can figure out, that operation we wanted to do finished successfully by checking that the
      # effect of the command (new topic, more partitions, etc) is handled. Exactly for that we
      # use the breaker. It we get a timeout, we can check that what we wanted to achieve has
      # happened via the breaker check, hence we do not need to wait any longer.
      #
      # @param handler [Proc] the wait handler operation
      # @param breaker [Proc] extra condition upon timeout that indicates things were finished ok
      def with_re_wait(handler, breaker)
        attempt ||= 0
        attempt += 1

        handler.call

        # If breaker does not operate, it means that the requested change was applied but is still
        # not visible and we need to wait
        raise(Errors::ResultNotVisibleError) unless breaker.call
      rescue Rdkafka::AbstractHandle::WaitTimeoutError, Errors::ResultNotVisibleError
        return if breaker.call

        retry if attempt <= app_config.admin.max_attempts

        raise
      end

      # @param type [Symbol] type of config we want
      # @param settings [Hash] extra settings for config (if needed)
      # @return [::Rdkafka::Config] rdkafka config
      def config(type, settings)
        app_config
          .kafka
          .then(&:dup)
          .merge(app_config.admin.kafka)
          .tap { |config| config[:'group.id'] = app_config.admin.group_id }
          # We merge after setting the group id so it can be altered if needed
          # In general in admin we only should alter it when we need to impersonate a given
          # consumer group or do something similar
          .merge!(settings)
          .then { |config| Karafka::Setup::AttributesMap.public_send(type, config) }
          .then { |config| ::Rdkafka::Config.new(config) }
      end

      # Resolves the offset if offset is in a time format. Otherwise returns the offset without
      # resolving.
      # @param consumer [::Rdkafka::Consumer]
      # @param name [String, Symbol] expected topic name
      # @param partition [Integer]
      # @param offset [Integer, Time]
      # @return [Integer] expected offset
      def resolve_offset(consumer, name, partition, offset)
        if offset.is_a?(Time)
          tpl = ::Rdkafka::Consumer::TopicPartitionList.new
          tpl.add_topic_and_partitions_with_offsets(
            name, partition => offset
          )

          real_offsets = consumer.offsets_for_times(tpl)
          detected_offset = real_offsets
                            .to_h
                            .fetch(name)
                            .find { |p_data| p_data.partition == partition }

          detected_offset&.offset || raise(Errors::InvalidTimeBasedOffsetError)
        else
          offset
        end
      end

      # @return [Karafka::Core::Configurable::Node] root node config
      def app_config
        ::Karafka::App.config
      end
    end
  end
end