karafka/karafka

View on GitHub
lib/karafka/pro/swarm/liveness_listener.rb

Summary

Maintainability
A
25 mins
Test Coverage
# frozen_string_literal: true

# This Karafka component is a Pro component under a commercial license.
# This Karafka component is NOT licensed under LGPL.
#
# All of the commercial components are present in the lib/karafka/pro directory of this
# repository and their usage requires commercial license agreement.
#
# Karafka has also commercial-friendly license, commercial support and commercial components.
#
# By sending a pull request to the pro components, you are agreeing to transfer the copyright of
# your code to Maciej Mensfeld.

module Karafka
  module Pro
    # Pro Swarm components namespace
    module Swarm
      # Pro listener that monitors RSS usage and other heartbeat metrics (if configured) to ensure
      # that everything operates.
      #
      # It can:
      #   - monitor poll frequency to make sure things are not polled not often enough
      #   - monitor consumption to make sure we do not process data for too long
      #   - monitor RSS to make sure that we do not use too much memory
      #
      # By default it does **not** monitor memory and consuming and polling is configured in such
      # a way to align with `max.poll.interval.ms` and other defaults.
      #
      # Failure statuses reported are as follows:
      #   - 1 - polling ttl exceeded
      #   - 2 - consuming ttl exceeded
      #   - 3 - memory limit exceeded
      #
      # @note This listener should not break anything if subscribed in the supervisor prior to
      #   forking as it relies on server events for operations.
      class LivenessListener < Karafka::Swarm::LivenessListener
        # @param memory_limit [Integer] max memory in MB for this process to be considered healthy
        # @param consuming_ttl [Integer] time in ms after which we consider consumption hanging.
        #   It allows us to define max consumption time after which supervisor should consider
        #   given process as hanging
        # @param polling_ttl [Integer] max time in ms for polling. If polling (any) does not
        #   happen that often, process should be considered dead.
        # @note The default TTL matches the default `max.poll.interval.ms`
        def initialize(
          memory_limit: Float::INFINITY,
          consuming_ttl: 5 * 60 * 1_000,
          polling_ttl: 5 * 60 * 1_000
        )
          @polling_ttl = polling_ttl
          @consuming_ttl = consuming_ttl
          # We cast it just in case someone would provide '10MB' or something similar
          @memory_limit = memory_limit.is_a?(String) ? memory_limit.to_i : memory_limit
          @pollings = {}
          @consumptions = {}

          super()
        end

        # Tick on each fetch
        #
        # @param _event [Karafka::Core::Monitoring::Event]
        def on_connection_listener_fetch_loop(_event)
          mark_polling_tick
        end

        {
          consume: :consumed,
          revoke: :revoked,
          shutting_down: :shutdown,
          tick: :ticked
        }.each do |before, after|
          class_eval <<~RUBY, __FILE__, __LINE__ + 1
            # Tick on starting work
            # @param _event [Karafka::Core::Monitoring::Event]
            def on_consumer_#{before}(_event)
              mark_consumption_tick
            end

            # Tick on finished work
            # @param _event [Karafka::Core::Monitoring::Event]
            def on_consumer_#{after}(_event)
              clear_consumption_tick
            end
          RUBY
        end

        # @param _event [Karafka::Core::Monitoring::Event]
        def on_error_occurred(_event)
          clear_consumption_tick
          clear_polling_tick
        end

        # Reports the current status once in a while
        #
        # @param _event [Karafka::Core::Monitoring::Event]
        def on_statistics_emitted(_event)
          periodically do
            return unless node

            current_status = status

            current_status.positive? ? node.unhealthy(current_status) : node.healthy
          end
        end

        # Deregister the polling tracker for given listener
        # @param _event [Karafka::Core::Monitoring::Event]
        def on_connection_listener_stopping(_event)
          # We are interested in disabling tracking for given listener only if it was requested
          # when karafka was running. If we would always clear, it would not catch the shutdown
          # polling requirements. The "running" listener shutdown operations happen only when
          # the manager requests it for downscaling.
          return if Karafka::App.done?

          clear_polling_tick
        end

        # Deregister the polling tracker for given listener
        # @param _event [Karafka::Core::Monitoring::Event]
        def on_connection_listener_stopped(_event)
          return if Karafka::App.done?

          clear_polling_tick
        end

        private

        # @return [Integer] object id of the current thread
        def thread_id
          Thread.current.object_id
        end

        # Update the polling tick time for current thread
        def mark_polling_tick
          synchronize do
            @pollings[thread_id] = monotonic_now
          end
        end

        # Clear current thread polling time tracker
        def clear_polling_tick
          synchronize do
            @pollings.delete(thread_id)
          end
        end

        # Update the processing tick time
        def mark_consumption_tick
          synchronize do
            @consumptions[thread_id] = monotonic_now
          end
        end

        # Clear current thread consumption time tracker
        def clear_consumption_tick
          synchronize do
            @consumptions.delete(thread_id)
          end
        end

        # Did we exceed any of the ttls
        # @return [String] 204 string if ok, 500 otherwise
        def status
          time = monotonic_now

          return 1 if @pollings.values.any? { |tick| (time - tick) > @polling_ttl }
          return 2 if @consumptions.values.any? { |tick| (time - tick) > @consuming_ttl }
          return 3 if rss_mb > @memory_limit

          0
        end

        # @return [Integer] RSS in MB for the current process
        # @note Since swarm is linux only, we do not have to worry about getting RSS on other OSes
        def rss_mb
          kb_rss = 0

          IO.readlines("/proc/#{node.pid}/status").each do |line|
            next unless line.start_with?('VmRSS:')

            kb_rss = line.split[1].to_i

            break
          end

          (kb_rss / 1_024.to_i).round
        end
      end
    end
  end
end