mperham/sidekiq

View on GitHub
lib/sidekiq/monitor.rb

Summary

Maintainability
A
25 mins
Test Coverage
B
88%
#!/usr/bin/env ruby

require "fileutils"
require "sidekiq/api"

class Sidekiq::Monitor
  class Status
    VALID_SECTIONS = %w[all version overview processes queues]
    COL_PAD = 2

    def display(section = nil)
      section ||= "all"
      unless VALID_SECTIONS.include? section
        puts "I don't know how to check the status of '#{section}'!"
        puts "Try one of these: #{VALID_SECTIONS.join(", ")}"
        return
      end
      send(section)
    rescue => e
      puts "Couldn't get status: #{e}"
    end

    def all
      version
      puts
      overview
      puts
      processes
      puts
      queues
    end

    def version
      puts "Sidekiq #{Sidekiq::VERSION}"
      puts Time.now.utc
    end

    def overview
      puts "---- Overview ----"
      puts "  Processed: #{delimit stats.processed}"
      puts "     Failed: #{delimit stats.failed}"
      puts "       Busy: #{delimit stats.workers_size}"
      puts "   Enqueued: #{delimit stats.enqueued}"
      puts "    Retries: #{delimit stats.retry_size}"
      puts "  Scheduled: #{delimit stats.scheduled_size}"
      puts "       Dead: #{delimit stats.dead_size}"
    end

    def processes
      puts "---- Processes (#{process_set.size}) ----"
      process_set.each_with_index do |process, index|
        puts "#{process["identity"]} #{tags_for(process)}"
        puts "  Started: #{Time.at(process["started_at"])} (#{time_ago(process["started_at"])})"
        puts "  Threads: #{process["concurrency"]} (#{process["busy"]} busy)"
        puts "   Queues: #{split_multiline(process["queues"].sort, pad: 11)}"
        puts "" unless (index + 1) == process_set.size
      end
    end

    def queues
      puts "---- Queues (#{queue_data.size}) ----"
      columns = {
        name: [:ljust, (["name"] + queue_data.map(&:name)).map(&:length).max + COL_PAD],
        size: [:rjust, (["size"] + queue_data.map(&:size)).map(&:length).max + COL_PAD],
        latency: [:rjust, (["latency"] + queue_data.map(&:latency)).map(&:length).max + COL_PAD],
      }
      columns.each { |col, (dir, width)| print col.to_s.upcase.public_send(dir, width) }
      puts
      queue_data.each do |q|
        columns.each do |col, (dir, width)|
          print q.send(col).public_send(dir, width)
        end
        puts
      end
    end

    private

    def delimit(number)
      number.to_s.reverse.scan(/.{1,3}/).join(",").reverse
    end

    def split_multiline(values, opts = {})
      return "none" unless values
      pad = opts[:pad] || 0
      max_length = opts[:max_length] || (80 - pad)
      out = []
      line = ""
      values.each do |value|
        if (line.length + value.length) > max_length
          out << line
          line = " " * pad
        end
        line << value + ", "
      end
      out << line[0..-3]
      out.join("\n")
    end

    def tags_for(process)
      tags = [
        process["tag"],
        process["labels"],
        (process["quiet"] == "true" ? "quiet" : nil),
      ].flatten.compact
      tags.any? ? "[#{tags.join("] [")}]" : nil
    end

    def time_ago(timestamp)
      seconds = Time.now - Time.at(timestamp)
      return "just now" if seconds < 60
      return "a minute ago" if seconds < 120
      return "#{seconds.floor / 60} minutes ago" if seconds < 3600
      return "an hour ago" if seconds < 7200
      "#{seconds.floor / 60 / 60} hours ago"
    end

    QUEUE_STRUCT = Struct.new(:name, :size, :latency)
    def queue_data
      @queue_data ||= Sidekiq::Queue.all.map { |q|
        QUEUE_STRUCT.new(q.name, q.size.to_s, sprintf("%#.2f", q.latency))
      }
    end

    def process_set
      @process_set ||= Sidekiq::ProcessSet.new
    end

    def stats
      @stats ||= Sidekiq::Stats.new
    end
  end
end