sonots/haikanko

View on GitHub
pipework/fluentd/server/plugin/out_numeric_monitor.rb

Summary

Maintainability
D
1 day
Test Coverage
class Fluent::NumericMonitorOutput < Fluent::Output
  Fluent::Plugin.register_output('numeric_monitor', self)

  EMIT_STREAM_RECORDS = 100

  config_param :count_interval, :time, :default => 60
  config_param :unit, :string, :default => nil
  config_param :tag, :string, :default => 'monitor'
  config_param :tag_prefix, :string, :default => nil

  config_param :aggregate, :default => 'tag' do |val|
    case val
    when 'tag' then :tag
    when 'all' then :all
    else
      raise Fluent::ConfigError, "aggregate MUST be one of 'tag' or 'all'"
    end
  end
  config_param :input_tag_remove_prefix, :string, :default => nil
  config_param :monitor_key, :string
  config_param :output_key_prefix, :string, :default => nil
  config_param :percentiles, :default => nil do |val|
    values = val.split(",").map(&:to_i)
    if values.select{|i| i < 1 or i > 99 }.size > 0
      raise Fluent::ConfigError, "percentiles MUST be specified between 1 and 99 by integer"
    end
    values
  end

  config_param :samples_limit, :integer, :default => 1000000

  attr_accessor :count, :last_checked

  def configure(conf)
    super

    if @unit
      @count_interval = case @unit
                        when 'minute' then 60
                        when 'hour' then 3600
                        when 'day' then 86400
                        else
                          raise Fluent::ConfigError, "unit must be one of minute/hour/day"
                        end
    end

    if @input_tag_remove_prefix
      @removed_prefix_string = @input_tag_remove_prefix + '.'
      @removed_length = @removed_prefix_string.length
    end

    if @aggregate == :tag
      raise Fluent::ConfigError, "tag_prefix must be specified with output_per_tag" unless @tag_prefix
      @tag_prefix_string = @tag_prefix + '.'
    end
    
    @count = count_initialized
    @mutex = Mutex.new
  end

  def start
    super
    start_watch
  end

  def shutdown
    super
    @watcher.terminate
    @watcher.join
  end

  def start_watch
    # for internal, or tests
    @watcher = Thread.new(&method(:watch))
  end

  def watch
    @last_checked = Fluent::Engine.now
    while true
      sleep 0.5
      if Fluent::Engine.now - @last_checked > @count_interval
        now = Fluent::Engine.now
        flush_emit
        @last_checked = now
      end
    end
  end

  def count_initialized(keys=nil)
    # counts['tag'] = {:min => num, :max => num, :sum => num, :num => num [, :sample => [....]]}
    if @aggregate == :all
      if @percentiles
        {'all' => {:min => nil, :max => nil, :sum => nil, :num => 0, :sample => []}}
      else
        {'all' => {:min => nil, :max => nil, :sum => nil, :num => 0}}
      end
    elsif keys
      values = if @percentiles
                 Array.new(keys.length) {|i| {:min => nil, :max => nil, :sum => nil, :num => 0, :sample => []}}
               else
                 Array.new(keys.length) {|i| {:min => nil, :max => nil, :sum => nil, :num => 0}}
               end
      Hash[[keys, values].transpose]
    else
      {}
    end
  end

  def stripped_tag(tag)
    return tag unless @input_tag_remove_prefix
    return tag[@removed_length..-1] if tag.start_with?(@removed_prefix_string) and tag.length > @removed_length
    return tag[@removed_length..-1] if tag == @input_tag_remove_prefix
    tag
  end

  def generate_output(count)
    output = {}
    if @aggregate == :all
      c = count['all']
      if c[:num] then output['num'] = c[:num] end
      if c[:min] then output['min'] = c[:min] end
      if c[:max] then output['max'] = c[:max] end
      if c[:num] > 0 then output['avg'] = (c[:sum] / (c[:num] * 1.0)) end
      if @percentiles
        sorted = c[:sample].sort
        @percentiles.each do |p|
          i = (c[:num] * p / 100).floor
          if i > 0
            i -= 1
          end
          output["percentile_#{p}"] = sorted[i]
        end
      end
      return output
    end

    count.keys.each do |tag|
      output[tag] = {}
      # t = stripped_tag(tag)
      t = @output_key_prefix ? @output_key_prefix : @monitor_key
      c = count[tag]
      if c[:num] then output[tag][t + '_num'] = c[:num] end
      if c[:min] then output[tag][t + '_min'] = c[:min] end
      if c[:max] then output[tag][t + '_max'] = c[:max] end
      if c[:num] > 0 then output[tag][t + '_avg'] = (c[:sum] / (c[:num] * 1.0)) end
      if @percentiles
        sorted = c[:sample].sort
        @percentiles.each do |p|
          i = (c[:num] * p / 100).floor
          if i > 0
            i -= 1
          end
          output[tag][t + "_percentile_#{p}"] = sorted[i]
        end
      end
    end
    output
  end

  def flush
    flushed,@count = @count,count_initialized(@count.keys.dup)
    generate_output(flushed)
  end

  def flush_emit
    if @aggregate == :tag
      flush.each do |tag, monitor|
        Fluent::Engine.emit(@tag_prefix_string + tag, Fluent::Engine.now, monitor)
      end
    else
      Fluent::Engine.emit(@tag, Fluent::Engine.now, flush)
    end
  end

  def countups(tag, min, max, sum, num, sample)
    if @aggregate == :all
      tag = 'all'
    end

    @mutex.synchronize do
      c = (@count[tag] ||= {:min => nil, :max => nil, :sum => nil, :num => 0})
      
      if c[:min].nil? or c[:min] > min
        c[:min] = min
      end
      if c[:max].nil? or c[:max] < max
        c[:max] = max
      end
      c[:sum] = (c[:sum] || 0) + sum
      c[:num] += num

      if @percentiles
        c[:sample] ||= []
        if c[:sample].size + sample.size > @samples_limit
          (c[:sample].size + sample.size - @samples_limit).times do
            c[:sample].delete_at(rand(c[:sample].size)) 
          end
        end
        c[:sample] += sample
      end
    end
  end

  def emit(tag, es, chain)
    min = nil
    max = nil
    sum = 0
    num = 0
    sample = if @percentiles then [] else nil end

    es.each do |time,record|
      value = record[@monitor_key]
      next if value.nil?

      value = value.to_f
      if min.nil? or min > value
        min = value
      end
      if max.nil? or max < value
        max = value
      end
      sum += value
      num += 1

      if @percentiles
        sample.push(value)
      end
    end
    if @percentiles && sample.size > @samples_limit
      (sample.size - @samples_limit / 2).to_i.times do
        sample.delete_at(rand(sample.size))
      end
    end
    countups(tag, min, max, sum, num, sample)

    chain.next
  end
end