appsignal/appsignal

View on GitHub
lib/appsignal/hooks/sidekiq.rb

Summary

Maintainability
B
6 hrs
Test Coverage
# frozen_string_literal: true

require "yaml"

module Appsignal
  class Hooks
    class SidekiqHook < Appsignal::Hooks::Hook
      register :sidekiq

      def dependencies_present?
        defined?(::Sidekiq)
      end

      def install
        Appsignal::Minutely.probes.register :sidekiq, SidekiqProbe

        ::Sidekiq.configure_server do |config|
          config.server_middleware do |chain|
            chain.add Appsignal::Hooks::SidekiqPlugin
          end
        end
      end
    end

    class SidekiqProbe
      attr_reader :config

      def self.dependencies_present?
        Gem::Version.new(::Redis::VERSION) >= Gem::Version.new("3.3.5")
      end

      def initialize(config = {})
        @config = config
        @cache = {}
        config_string = " with config: #{config}" unless config.empty?
        Appsignal.logger.debug("Initializing Sidekiq probe#{config_string}")
        require "sidekiq/api"
      end

      def call
        track_redis_info
        track_stats
        track_queues
      end

      private

      attr_reader :cache

      def track_redis_info
        return unless ::Sidekiq.respond_to?(:redis_info)
        redis_info = ::Sidekiq.redis_info

        gauge "connection_count", redis_info.fetch("connected_clients")
        gauge "memory_usage", redis_info.fetch("used_memory")
        gauge "memory_usage_rss", redis_info.fetch("used_memory_rss")
      end

      def track_stats
        stats = ::Sidekiq::Stats.new

        gauge "worker_count", stats.workers_size
        gauge "process_count", stats.processes_size
        gauge_delta :jobs_processed, "job_count", stats.processed,
          :status => :processed
        gauge_delta :jobs_failed, "job_count", stats.failed, :status => :failed
        gauge "job_count", stats.retry_size, :status => :retry_queue
        gauge_delta :jobs_dead, "job_count", stats.dead_size, :status => :died
        gauge "job_count", stats.scheduled_size, :status => :scheduled
        gauge "job_count", stats.enqueued, :status => :enqueued
      end

      def track_queues
        ::Sidekiq::Queue.all.each do |queue|
          gauge "queue_length", queue.size, :queue => queue.name
          # Convert latency from seconds to milliseconds
          gauge "queue_latency", queue.latency * 1_000.0, :queue => queue.name
        end
      end

      # Track a gauge metric with the `sidekiq_` prefix
      def gauge(key, value, tags = {})
        tags[:hostname] = hostname if hostname
        Appsignal.set_gauge "sidekiq_#{key}", value, tags
      end

      # Track the delta of two values for a gauge metric
      #
      # First call will store the data for the metric and the second call will
      # set a gauge metric with the difference. This is used for absolute
      # counter values which we want to track as gauges.
      #
      # @example
      #   gauge_delta :my_cache_key, "my_gauge", 10
      #   gauge_delta :my_cache_key, "my_gauge", 15
      #   # Creates a gauge with the value `5`
      # @see #gauge
      def gauge_delta(cache_key, key, value, tags = {})
        previous_value = cache[cache_key]
        cache[cache_key] = value
        return unless previous_value
        new_value = value - previous_value
        gauge key, new_value, tags
      end

      def hostname
        return @hostname if defined?(@hostname)
        if config.key?(:hostname)
          @hostname = config[:hostname]
          Appsignal.logger.debug "Sidekiq probe: Using hostname config " \
            "option #{@hostname.inspect} as hostname"
          return @hostname
        end

        host = nil
        ::Sidekiq.redis { |c| host = c.connection[:host] }
        Appsignal.logger.debug "Sidekiq probe: Using Redis server hostname " \
          "#{host.inspect} as hostname"
        @hostname = host
      end
    end

    # @api private
    class SidekiqPlugin # rubocop:disable Metrics/ClassLength
      include Appsignal::Hooks::Helpers

      UNKNOWN_ACTION_NAME = "unknown".freeze
      JOB_KEYS = %w[
        args backtrace class created_at enqueued_at error_backtrace error_class
        error_message failed_at jid retried_at retry wrapped
      ].freeze

      def call(_worker, item, _queue)
        job_status = nil
        transaction = Appsignal::Transaction.create(
          SecureRandom.uuid,
          Appsignal::Transaction::BACKGROUND_JOB,
          Appsignal::Transaction::GenericRequest.new(
            :queue_start => item["enqueued_at"]
          )
        )

        Appsignal.instrument "perform_job.sidekiq" do
          begin
            yield
          rescue Exception => exception # rubocop:disable Lint/RescueException
            job_status = :failed
            transaction.set_error(exception)
            raise exception
          end
        end
      ensure
        if transaction
          transaction.set_action_if_nil(formatted_action_name(item))
          transaction.params = filtered_arguments(item)
          formatted_metadata(item).each do |key, value|
            transaction.set_metadata key, value
          end
          transaction.set_http_or_background_queue_start
          Appsignal::Transaction.complete_current!
          queue = item["queue"] || "unknown"
          if job_status
            increment_counter "queue_job_count", 1,
              :queue => queue,
              :status => job_status
          end
          increment_counter "queue_job_count", 1,
            :queue => queue,
            :status => :processed
        end
      end

      private

      def increment_counter(key, value, tags = {})
        Appsignal.increment_counter "sidekiq_#{key}", value, tags
      end

      def formatted_action_name(job)
        sidekiq_action_name = parse_action_name(job)
        complete_action = sidekiq_action_name =~ /\.|#/
        if complete_action || sidekiq_action_name == UNKNOWN_ACTION_NAME
          return sidekiq_action_name
        end
        "#{sidekiq_action_name}#perform"
      end

      def filtered_arguments(job)
        Appsignal::Utils::HashSanitizer.sanitize(
          parse_arguments(job),
          Appsignal.config[:filter_parameters]
        )
      end

      def formatted_metadata(item)
        {}.tap do |hash|
          (item || {}).each do |key, value|
            next if JOB_KEYS.include?(key)
            hash[key] = truncate(string_or_inspect(value))
          end
        end
      end

      # Based on: https://github.com/mperham/sidekiq/blob/63ee43353bd3b753beb0233f64865e658abeb1c3/lib/sidekiq/api.rb#L316-L334
      def parse_action_name(job)
        args = job.fetch("args", [])
        job_class = job["class"]
        case job_class
        when "Sidekiq::Extensions::DelayedModel"
          safe_load(args[0], job_class) do |target, method, _|
            "#{target.class}##{method}"
          end
        when /\ASidekiq::Extensions::Delayed/
          safe_load(args[0], job_class) do |target, method, _|
            "#{target}.#{method}"
          end
        when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
          wrapped_job = job["wrapped"]
          if wrapped_job
            parse_active_job_action_name_from_wrapped job
          else
            parse_active_job_action_name_from_arguments job
          end
        else
          job_class
        end
      end

      # Return the ActiveJob wrapped job name.
      #
      # Returns "unknown" if no acceptable job class name could be found.
      #
      # @example Payload with "wrapped" value
      #   {
      #     "class" => "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper",
      #     "wrapped" => "MyWrappedJob",
      #     # ...
      #   }
      def parse_active_job_action_name_from_wrapped(job)
        job_class = job["wrapped"]
        case job_class
        when "ActionMailer::DeliveryJob"
          extract_action_mailer_name job["args"]
        when String
          job_class
        else
          unknown_action_name_for job
        end
      end

      # Return the ActiveJob job name based on the job's arguments.
      #
      # Returns "unknown" if no acceptable job class name could be found.
      #
      # @example Payload without "wrapped" value
      #   {
      #     "class" => "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper",
      #     "args" => [{
      #       "job_class" => "MyWrappedJob",
      #       # ...
      #     }]
      #     # ...
      #   }
      def parse_active_job_action_name_from_arguments(job)
        args = job.fetch("args", [])
        first_arg = args[0]
        if first_arg == "ActionMailer::DeliveryJob"
          extract_action_mailer_name args
        elsif active_job_payload?(first_arg)
          first_arg["job_class"]
        else
          unknown_action_name_for job
        end
      end

      # Checks if the first argument in the job payload is an ActiveJob payload.
      def active_job_payload?(arg)
        arg.is_a?(Hash) && arg["job_class"].is_a?(String)
      end

      def unknown_action_name_for(job)
        Appsignal.logger.debug \
          "Unable to determine an action name from Sidekiq payload: #{job}"
        UNKNOWN_ACTION_NAME
      end

      def extract_action_mailer_name(args)
        # Returns in format: MailerClass#mailer_method
        args[0]["arguments"][0..1].join("#")
      end

      # Based on: https://github.com/mperham/sidekiq/blob/63ee43353bd3b753beb0233f64865e658abeb1c3/lib/sidekiq/api.rb#L336-L358
      def parse_arguments(job)
        args = job.fetch("args", [])
        case job["class"]
        when /\ASidekiq::Extensions::Delayed/
          safe_load(args[0], args) do |_, _, arg|
            arg
          end
        when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
          is_wrapped = job["wrapped"]
          first_arg = args[0]
          job_args =
            if is_wrapped || active_job_payload?(first_arg)
              first_arg["arguments"]
            else
              []
            end
          if (is_wrapped || first_arg) == "ActionMailer::DeliveryJob"
            # Remove MailerClass, mailer_method and "deliver_now"
            job_args.drop(3)
          else
            job_args
          end
        else
          # Sidekiq Enterprise argument encryption.
          # More information: https://github.com/mperham/sidekiq/wiki/Ent-Encryption
          if job["encrypt".freeze]
            # No point in showing 150+ bytes of random garbage
            args[-1] = "[encrypted data]".freeze
          end
          args
        end
      end

      # Based on: https://github.com/mperham/sidekiq/blob/63ee43353bd3b753beb0233f64865e658abeb1c3/lib/sidekiq/api.rb#L403-L412
      def safe_load(content, default)
        yield(*YAML.load(content))
      rescue => error
        # Sidekiq issue #1761: in dev mode, it's possible to have jobs enqueued
        # which haven't been loaded into memory yet so the YAML can't be
        # loaded.
        Appsignal.logger.warn "Unable to load YAML: #{error.message}"
        default
      end
    end
  end
end