graylog-labs/gelf-rb

View on GitHub
lib/gelf/notifier.rb

Summary

Maintainability
B
4 hrs
Test Coverage
require 'gelf/transport/udp'
require 'gelf/transport/tcp'
require 'gelf/transport/tcp_tls'

# replace JSON and #to_json with Yajl if available
begin
  require 'yajl/json_gem'
rescue LoadError
end

module GELF
  # Graylog2 notifier.
  class Notifier
    # Maximum number of GELF chunks as per GELF spec
    MAX_CHUNKS = 128
    MAX_CHUNK_SIZE_WAN = 1420
    MAX_CHUNK_SIZE_LAN = 8154

    attr_accessor :enabled, :collect_file_and_line, :rescue_network_errors
    attr_reader :max_chunk_size, :level, :default_options, :level_mapping

    # +host+ and +port+ are host/ip and port of graylog2-server.
    # +max_size+ is passed to max_chunk_size=.
    # +default_options+ is used in notify!
    def initialize(host = 'localhost', port = 12201, max_size = 'WAN', default_options = {})
      @enabled = true
      @collect_file_and_line = true
      @random = Random.new

      self.level = GELF::DEBUG
      self.max_chunk_size = max_size
      self.rescue_network_errors = false

      self.default_options = default_options.dup
      self.default_options['version'] = SPEC_VERSION
      self.default_options['host'] ||= Socket.gethostname
      self.default_options['level'] ||= GELF::UNKNOWN
      self.default_options['facility'] ||= 'gelf-rb'
      self.default_options['protocol'] ||= GELF::Protocol::UDP

      self.level_mapping = :logger
      @sender = create_sender(host, port)
    end

    # Get a list of receivers.
    #    notifier.addresses  # => [['localhost', 12201], ['localhost', 12202]]
    def addresses
      @sender.addresses
    end

    # Set a list of receivers.
    #    notifier.addresses = [['localhost', 12201], ['localhost', 12202]]
    def addresses=(addrs)
      @sender.addresses = addrs
    end

    # +size+ may be a number of bytes, 'WAN' (1420 bytes) or 'LAN' (8154).
    # Default (safe) value is 'WAN'.
    def max_chunk_size=(size)
      case size.to_s.downcase
        when 'wan'
          @max_chunk_size = MAX_CHUNK_SIZE_WAN
        when 'lan'
          @max_chunk_size = MAX_CHUNK_SIZE_LAN
        else
          @max_chunk_size = size.to_int
      end
    end

    def level=(new_level)
      @level = if new_level.is_a?(Integer)
                 new_level
               else
                 GELF.const_get(new_level.to_s.upcase)
               end
    end

    def default_options=(options)
      @default_options = self.class.stringify_keys(options)
    end

    # +mapping+ may be a hash, 'logger' (GELF::LOGGER_MAPPING) or 'direct' (GELF::DIRECT_MAPPING).
    # Default (compatible) value is 'logger'.
    def level_mapping=(mapping)
      case mapping.to_s.downcase
        when 'logger'
          @level_mapping = GELF::LOGGER_MAPPING
        when 'direct'
          @level_mapping = GELF::DIRECT_MAPPING
        else
          @level_mapping = mapping
      end
    end

    def disable
      @enabled = false
    end

    def enable
      @enabled = true
    end

    # Closes sender
    def close
      @sender.close
    end

    # Same as notify!, but rescues all exceptions (including +ArgumentError+)
    # and sends them instead.
    def notify(*args)
      notify_with_level(nil, *args)
    end

    # Sends message to Graylog2 server.
    # +args+ can be:
    # - hash-like object (any object which responds to +to_hash+, including +Hash+ instance):
    #    notify!(:short_message => 'All your rebase are belong to us', :user => 'AlekSi')
    # - exception with optional hash-like object:
    #    notify!(SecurityError.new('ALARM!'), :trespasser => 'AlekSi')
    # - string-like object (anything which responds to +to_s+) with optional hash-like object:
    #    notify!('Plain olde text message', :scribe => 'AlekSi')
    # Resulted fields are merged with +default_options+, the latter will never overwrite the former.
    # This method will raise +ArgumentError+ if arguments are wrong. Consider using notify instead.
    def notify!(*args)
      notify_with_level!(nil, *args)
    end

    GELF::Levels.constants.each do |const|
      define_method(const.downcase) do |*args|
        level = GELF.const_get(const)
        notify_with_level(level, *args)
      end
    end

  private

    def create_sender(host, port)
      addresses = [[host, port]]
      if default_options['protocol'] == GELF::Protocol::TCP
        if default_options.key?('tls')
          tls_options = default_options.delete('tls')
          GELF::Transport::TCPTLS.new(addresses, tls_options)
        else
          GELF::Transport::TCP.new(addresses)
        end
      else
        GELF::Transport::UDP.new(addresses)
      end
    end

    def notify_with_level(message_level, *args)
      notify_with_level!(message_level, *args)
    rescue SocketError, SystemCallError
      raise unless rescue_network_errors
    rescue Exception => exception
      notify_with_level!(GELF::UNKNOWN, exception)
    end

    def notify_with_level!(message_level, *args)
      return unless @enabled
      hash = extract_hash(*args)
      hash['level'] = message_level unless message_level.nil?
      if hash['level'] >= level
        if default_options['protocol'] == GELF::Protocol::TCP
          validate_hash(hash)
          @sender.send(hash.to_json + "\0")
        else
          @sender.send_datagrams(datagrams_from_hash(hash))
        end
      end
    end

    def extract_hash(object = nil, args = {})
      primary_data = if object.respond_to?(:to_hash)
                       object.to_hash
                     elsif object.is_a?(Exception)
                       args['level'] ||= GELF::ERROR
                       self.class.extract_hash_from_exception(object)
                     else
                       args['level'] ||= GELF::INFO
                       { 'short_message' => object.to_s }
                     end

      hash = default_options.merge(self.class.stringify_keys(args.merge(primary_data)))
      convert_hoptoad_keys_to_graylog2(hash)
      set_file_and_line(hash) if @collect_file_and_line
      set_timestamp(hash)
      check_presence_of_mandatory_attributes(hash)
      hash
    end

    def self.extract_hash_from_exception(exception)
      bt = exception.backtrace || ["Backtrace is not available."]
      {
        'short_message' => "#{exception.class}: #{exception.message}",
        'full_message' => "Backtrace:\n" + bt.join("\n")
      }
    end

    # Converts Hoptoad-specific keys in +@hash+ to Graylog2-specific.
    def convert_hoptoad_keys_to_graylog2(hash)
      if hash['short_message'].to_s.empty?
        if hash.has_key?('error_class') && hash.has_key?('error_message')
          hash['short_message'] = hash.delete('error_class') + ': ' + hash.delete('error_message')
        end
      end
    end

    CALLER_REGEXP = /^(.*):(\d+).*/
    LIB_GELF_PATTERN = File.join('lib', 'gelf')

    def set_file_and_line(hash)
      stack = caller
      frame = stack.find { |f| !f.include?(LIB_GELF_PATTERN) }
      match = CALLER_REGEXP.match(frame)
      hash['file'] = match[1]
      hash['line'] = match[2].to_i
    end

    def set_timestamp(hash)
      hash['timestamp'] = Time.now.utc.to_f if hash['timestamp'].nil?
    end

    def check_presence_of_mandatory_attributes(hash)
      %w(version short_message host).each do |attribute|
        if hash[attribute].to_s.empty?
          raise ArgumentError.new("#{attribute} is missing. Options version, short_message and host must be set.")
        end
      end
    end

    def datagrams_from_hash(hash)
      data = serialize_hash(hash)
      datagrams = []

      # Maximum total size is 8192 byte for UDP datagram. Split to chunks if bigger. (GELF v1.0 supports chunking)
      if data.count > @max_chunk_size
        id = @random.bytes(8)
        msg_id = Digest::MD5.digest("#{Time.now.to_f}-#{id}")[0, 8]
        num, count = 0, (data.count.to_f / @max_chunk_size).ceil
        if count > MAX_CHUNKS
          raise ArgumentError, "Data too big (#{data.count} bytes), would create more than #{MAX_CHUNKS} chunks!"
        end
        data.each_slice(@max_chunk_size) do |slice|
          datagrams << "\x1e\x0f" + msg_id + [num, count, *slice].pack('C*')
          num += 1
        end
      else
        datagrams << data.to_a.pack('C*')
      end

      datagrams
    end

    def validate_hash(hash)
      raise ArgumentError.new("Hash is empty.") if hash.nil? || hash.empty?
      hash['level'] = @level_mapping[hash['level']]
    end

    def serialize_hash(hash)
      validate_hash(hash)

      Zlib::Deflate.deflate(hash.to_json).bytes
    end

    def self.stringify_keys(data)
      return data unless data.is_a? Hash

      data.each_with_object({}) do |(key, value), obj|
        key_s = key.to_s

        if (key != key_s) && data.key?(key_s)
          raise ArgumentError, "Both #{key.inspect} and #{key_s} are present."
        end

        obj[key_s] = value
      end
    end
  end
end