fluent/fluentd

View on GitHub
lib/fluent/log.rb

Summary

Maintainability
F
5 days
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'forwardable'
require 'logger'

module Fluent
  class Log
    module TTYColor
      RESET   = "\033]R"
      CRE     = "\033[K"
      CLEAR   = "\033c"
      NORMAL  = "\033[0;39m"
      RED     = "\033[1;31m"
      GREEN   = "\033[1;32m"
      YELLOW  = "\033[1;33m"
      BLUE    = "\033[1;34m"
      MAGENTA = "\033[1;35m"
      CYAN    = "\033[1;36m"
      WHITE   = "\033[1;37m"
    end

    LEVEL_TRACE = 0
    LEVEL_DEBUG = 1
    LEVEL_INFO  = 2
    LEVEL_WARN  = 3
    LEVEL_ERROR = 4
    LEVEL_FATAL = 5

    LEVEL_TEXT = %w(trace debug info warn error fatal)

    LOG_EVENT_TAG_PREFIX = 'fluent'
    LOG_EVENT_LABEL = '@FLUENT_LOG'
    LOG_TYPE_SUPERVISOR = :supervisor # only in supervisor, or a worker with --no-supervisor
    LOG_TYPE_WORKER0 = :worker0 # only in a worker with worker_id=0 (without showing worker id)
    LOG_TYPE_DEFAULT = :default # show logs in all supervisor/workers, with worker id in workers (default)

    LOG_TYPES = [LOG_TYPE_SUPERVISOR, LOG_TYPE_WORKER0, LOG_TYPE_DEFAULT].freeze
    LOG_ROTATE_AGE = %w(daily weekly monthly)

    IGNORE_SAME_LOG_MAX_CACHE_SIZE = 1000 # If need, make this an option of system config.

    def self.str_to_level(log_level_str)
      case log_level_str.downcase
      when "trace" then LEVEL_TRACE
      when "debug" then LEVEL_DEBUG
      when "info"  then LEVEL_INFO
      when "warn"  then LEVEL_WARN
      when "error" then LEVEL_ERROR
      when "fatal" then LEVEL_FATAL
      else raise "Unknown log level: level = #{log_level_str}"
      end
    end

    def self.event_tags
      LEVEL_TEXT.map{|t| "#{LOG_EVENT_TAG_PREFIX}.#{t}" }
    end

    # Create a unique path for each process.
    #
    # >>> per_process_path("C:/tmp/test.log", :worker, 1)
    # C:/tmp/test-1.log
    # >>> per_process_path("C:/tmp/test.log", :supervisor, 0)
    # C:/tmp/test-supervisor-0.log
    def self.per_process_path(path, process_type, worker_id)
      path = Pathname(path)
      ext = path.extname

      if process_type == :supervisor
        suffix = "-#{process_type}-0#{ext}"  # "-0" for backword compatibility.
      else
        suffix = "-#{worker_id}#{ext}"
      end
      return path.sub_ext(suffix).to_s
    end

    def initialize(logger, opts={})
      # When ServerEngine changes the logger.level, the Fluentd logger level should also change.
      # So overwrites logger.level= below.
      # However, currently Fluentd doesn't use the ServerEngine's reloading feature,
      # so maybe we don't need this overwriting anymore.
      orig_logger_level_setter = logger.class.public_instance_method(:level=).bind(logger)
      me = self
      # The original ruby logger sets the number as each log level like below.
      # DEBUG = 0
      # INFO  = 1
      # WARN  = 2
      # ERROR = 3
      # FATAL = 4
      # Serverengine use this original log number. In addition to this, serverengine sets -1 as TRACE level.
      # TRACE = -1
      #
      # On the other hand, in fluentd side, it sets the number like below.
      # TRACE = 0
      # DEBUG = 1
      # INFO  = 2
      # WARN  = 3
      # ERROR = 4
      # FATAL = 5
      #
      # Then fluentd's level is set as serverengine's level + 1.
      # So if serverengine's logger level is changed, fluentd's log level will be changed to that + 1.
      logger.define_singleton_method(:level=) {|level| orig_logger_level_setter.call(level); me.level = self.level + 1 }

      @path = opts[:path]
      @logger = logger
      @out = logger.instance_variable_get(:@logdev)
      @level = logger.level + 1
      @debug_mode = false
      @log_event_enabled = false
      @depth_offset = 1
      @format = nil
      @time_format = nil
      @formatter = nil

      self.format = opts.fetch(:format, :text)
      self.time_format = opts[:time_format] if opts.key?(:time_format)
      enable_color out.tty?
      # TODO: This variable name is unclear so we should change to better name.
      @threads_exclude_events = []

      # Fluent::Engine requires Fluent::Log, so we must take that object lazily
      @engine = Fluent.const_get('Engine')
      @optional_header = nil
      @optional_attrs = nil

      @suppress_repeated_stacktrace = opts[:suppress_repeated_stacktrace]
      @ignore_repeated_log_interval = opts[:ignore_repeated_log_interval]
      @ignore_same_log_interval = opts[:ignore_same_log_interval]

      @process_type = opts[:process_type] # :supervisor, :worker0, :workers Or :standalone
      @process_type ||= :standalone # to keep behavior of existing code
      case @process_type
      when :supervisor
        @show_supervisor_log = true
        @show_worker0_log = false
      when :worker0
        @show_supervisor_log = false
        @show_worker0_log = true
      when :workers
        @show_supervisor_log = false
        @show_worker0_log = false
      when :standalone
        @show_supervisor_log = true
        @show_worker0_log = true
      else
        raise "BUG: unknown process type for logger:#{@process_type}"
      end
      @worker_id = opts[:worker_id]
      @worker_id_part = "##{@worker_id} " # used only for :default log type in workers
    end

    def dup
      dl_opts = {}
      dl_opts[:log_level] = @level - 1
      logger = ServerEngine::DaemonLogger.new(@out, dl_opts)
      clone = self.class.new(logger, suppress_repeated_stacktrace: @suppress_repeated_stacktrace, process_type: @process_type,
                             worker_id: @worker_id, ignore_repeated_log_interval: @ignore_repeated_log_interval,
                             ignore_same_log_interval: @ignore_same_log_interval)
      clone.format = @format
      clone.time_format = @time_format
      clone.log_event_enabled = @log_event_enabled
      # optional headers/attrs are not copied, because new PluginLogger should have another one of it
      clone
    end

    attr_reader :format
    attr_reader :time_format
    attr_accessor :log_event_enabled, :ignore_repeated_log_interval, :ignore_same_log_interval, :suppress_repeated_stacktrace
    attr_accessor :out
    # Strictly speaking, we should also change @logger.level when the setter of @level is called.
    # Currently, we don't need to do it, since Fluentd::Log doesn't use ServerEngine::DaemonLogger.level.
    # Since We overwrites logger.level= so that @logger.level is applied to @level,
    # we need to find a good way to do this, otherwise we will end up in an endless loop.
    attr_accessor :level
    attr_accessor :optional_header, :optional_attrs

    def logdev=(logdev)
      @out = logdev
      @logger.instance_variable_set(:@logdev, logdev)
      nil
    end

    def format=(fmt)
      return if @format == fmt

      @time_format = '%Y-%m-%d %H:%M:%S %z'
      @time_formatter = Strftime.new(@time_format) rescue nil

      case fmt
      when :text
        @format = :text
        @formatter = Proc.new { |type, time, level, msg|
          r = caller_line(type, time, @depth_offset, level)
          r << msg
          r
        }
      when :json
        @format = :json
        @formatter = Proc.new { |type, time, level, msg|
          r = {
            'time' => format_time(time),
            'level' => LEVEL_TEXT[level],
            'message' => msg
          }
          if wid = get_worker_id(type)
            r['worker_id'] = wid
          end
          Yajl.dump(r)
        }
      end

      nil
    end

    def time_format=(time_fmt)
      @time_format = time_fmt
      @time_formatter = Strftime.new(@time_format) rescue nil
    end

    def stdout?
      @out == $stdout
    end

    def reopen!
      @out.reopen(@path, "a") if @path && @path != "-"
      nil
    end

    def enable_debug(b=true)
      @debug_mode = b
      self
    end

    def enable_event(b=true)
      @log_event_enabled = b
      self
    end

    # If you want to suppress event emitting in specific thread, please use this method.
    # Events in passed thread are never emitted.
    def disable_events(thread)
      # this method is not symmetric with #enable_event.
      @threads_exclude_events.push(thread) unless @threads_exclude_events.include?(thread)
    end

    def enable_color?
      !@color_reset.empty?
    end

    def enable_color(b=true)
      if b
        @color_trace = TTYColor::BLUE
        @color_debug = TTYColor::WHITE
        @color_info  = TTYColor::GREEN
        @color_warn  = TTYColor::YELLOW
        @color_error = TTYColor::MAGENTA
        @color_fatal = TTYColor::RED
        @color_reset = TTYColor::NORMAL
      else
        @color_trace = ''
        @color_debug = ''
        @color_info  = ''
        @color_warn  = ''
        @color_error = ''
        @color_fatal = ''
        @color_reset = ''
      end
      self
    end

    def log_type(args)
      if LOG_TYPES.include?(args.first)
        args.shift
      else
        LOG_TYPE_DEFAULT
      end
    end

    # TODO: skip :worker0 logs when Fluentd gracefully restarted
    def skipped_type?(type)
      case type
      when LOG_TYPE_DEFAULT
        false
      when LOG_TYPE_WORKER0
        !@show_worker0_log
      when LOG_TYPE_SUPERVISOR
        !@show_supervisor_log
      else
        raise "BUG: unknown log type:#{type}"
      end
    end

    def on_trace
      return if @level > LEVEL_TRACE
      yield
    end

    def trace(*args, &block)
      return if @level > LEVEL_TRACE
      type = log_type(args)
      return if skipped_type?(type)
      args << block.call if block
      time, msg = event(:trace, args)
      return if time.nil?
      puts [@color_trace, @formatter.call(type, time, LEVEL_TRACE, msg), @color_reset].join
    rescue
      # logger should not raise an exception. This rescue prevents unexpected behaviour.
    end
    alias TRACE trace

    def trace_backtrace(backtrace=$!.backtrace, type: :default)
      dump_stacktrace(type, backtrace, LEVEL_TRACE)
    end

    def on_debug
      return if @level > LEVEL_DEBUG
      yield
    end

    def debug(*args, &block)
      return if @level > LEVEL_DEBUG
      type = log_type(args)
      return if skipped_type?(type)
      args << block.call if block
      time, msg = event(:debug, args)
      return if time.nil?
      puts [@color_debug, @formatter.call(type, time, LEVEL_DEBUG, msg), @color_reset].join
    rescue
    end
    alias DEBUG debug

    def debug_backtrace(backtrace=$!.backtrace, type: :default)
      dump_stacktrace(type, backtrace, LEVEL_DEBUG)
    end

    def on_info
      return if @level > LEVEL_INFO
      yield
    end

    def info(*args, &block)
      return if @level > LEVEL_INFO
      type = log_type(args)
      return if skipped_type?(type)
      args << block.call if block
      time, msg = event(:info, args)
      return if time.nil?
      puts [@color_info, @formatter.call(type, time, LEVEL_INFO, msg), @color_reset].join
    rescue
    end
    alias INFO info

    def info_backtrace(backtrace=$!.backtrace, type: :default)
      dump_stacktrace(type, backtrace, LEVEL_INFO)
    end

    def on_warn
      return if @level > LEVEL_WARN
      yield
    end

    def warn(*args, &block)
      return if @level > LEVEL_WARN
      type = log_type(args)
      return if skipped_type?(type)
      args << block.call if block
      time, msg = event(:warn, args)
      return if time.nil?
      puts [@color_warn, @formatter.call(type, time, LEVEL_WARN, msg), @color_reset].join
    rescue
    end
    alias WARN warn

    def warn_backtrace(backtrace=$!.backtrace, type: :default)
      dump_stacktrace(type, backtrace, LEVEL_WARN)
    end

    def on_error
      return if @level > LEVEL_ERROR
      yield
    end

    def error(*args, &block)
      return if @level > LEVEL_ERROR
      type = log_type(args)
      return if skipped_type?(type)
      args << block.call if block
      time, msg = event(:error, args)
      return if time.nil?
      puts [@color_error, @formatter.call(type, time, LEVEL_ERROR, msg), @color_reset].join
    rescue
    end
    alias ERROR error

    def error_backtrace(backtrace=$!.backtrace, type: :default)
      dump_stacktrace(type, backtrace, LEVEL_ERROR)
    end

    def on_fatal
      return if @level > LEVEL_FATAL
      yield
    end

    def fatal(*args, &block)
      return if @level > LEVEL_FATAL
      type = log_type(args)
      return if skipped_type?(type)
      args << block.call if block
      time, msg = event(:fatal, args)
      return if time.nil?
      puts [@color_fatal, @formatter.call(type, time, LEVEL_FATAL, msg), @color_reset].join
    rescue
    end
    alias FATAL fatal

    def fatal_backtrace(backtrace=$!.backtrace, type: :default)
      dump_stacktrace(type, backtrace, LEVEL_FATAL)
    end

    def puts(msg)
      @logger << msg + "\n"
      @out.flush
      msg
    rescue
      # FIXME
      nil
    end

    def write(data)
      @out.write(data)
    end
    # We need `#<<` method to use this logger class with other
    # libraries such as aws-sdk
    alias << write

    def flush
      @out.flush
    end

    def reset
      @out.reset if @out.respond_to?(:reset)
    end

    CachedLog = Struct.new(:msg, :time)

    def ignore_repeated_log?(key, time, message)
      cached_log = Thread.current[key]
      return false if cached_log.nil?
      (cached_log.msg == message) && (time - cached_log.time <= @ignore_repeated_log_interval)
    end

    def ignore_same_log?(time, message)
      cached_log = Thread.current[:last_same_log]
      if cached_log.nil?
        Thread.current[:last_same_log] = {message => time}
        return false
      end

      prev_time = cached_log[message]
      if prev_time
        if (time - prev_time) <= @ignore_same_log_interval
          true
        else
          cached_log[message] = time
          false
        end
      else
        if cached_log.size >= IGNORE_SAME_LOG_MAX_CACHE_SIZE
          cached_log.reject! do |_, cached_time|
            (time - cached_time) > @ignore_same_log_interval
          end
        end
        # If the size is still over, we have no choice but to clear it.
        cached_log.clear if cached_log.size >= IGNORE_SAME_LOG_MAX_CACHE_SIZE
        cached_log[message] = time
        false
      end
    end

    def suppress_stacktrace?(backtrace)
      cached_log = Thread.current[:last_repeated_stacktrace]
      return false if cached_log.nil?
      cached_log.msg == backtrace
    end

    def dump_stacktrace(type, backtrace, level)
      return if @level > level

      time = Time.now

      if @format == :text
        line = caller_line(type, time, 5, level)
        if @ignore_repeated_log_interval && ignore_repeated_log?(:last_repeated_stacktrace, time, backtrace)
          return
        elsif @suppress_repeated_stacktrace && suppress_stacktrace?(backtrace)
          puts ["  ", line, 'suppressed same stacktrace'].join
          Thread.current[:last_repeated_stacktrace] = CachedLog.new(backtrace, time) if @ignore_repeated_log_interval
        else
          backtrace.each { |msg|
            puts ["  ", line, msg].join
          }
          Thread.current[:last_repeated_stacktrace] = CachedLog.new(backtrace, time) if @suppress_repeated_stacktrace
        end
      else
        r = {
          'time' => format_time(time),
          'level' => LEVEL_TEXT[level],
        }
        if wid = get_worker_id(type)
          r['worker_id'] = wid
        end

        if @ignore_repeated_log_interval && ignore_repeated_log?(:last_repeated_stacktrace, time, backtrace)
          return
        elsif @suppress_repeated_stacktrace && suppress_stacktrace?(backtrace)
          r['message'] = 'suppressed same stacktrace'
          Thread.current[:last_repeated_stacktrace] = CachedLog.new(backtrace, time) if @ignore_repeated_log_interval
        else
          r['message'] = backtrace.join("\n")
          Thread.current[:last_repeated_stacktrace] = CachedLog.new(backtrace, time) if @suppress_repeated_stacktrace
        end

        puts Yajl.dump(r)
      end

      nil
    end

    def get_worker_id(type)
      if type == :default && (@process_type == :worker0 || @process_type == :workers)
        @worker_id
      else
        nil
      end
    end

    def event(level, args)
      time = Time.now
      message = @optional_header ? @optional_header.dup : ''
      map = @optional_attrs ? @optional_attrs.dup : {}
      args.each {|a|
        if a.is_a?(Hash)
          a.each_pair {|k,v|
            map[k.to_s] = v
          }
        else
          message << a.to_s
        end
      }

      map.each_pair {|k,v|
        if k == "error".freeze && v.is_a?(Exception) && !map.has_key?("error_class")
          message << " error_class=#{v.class.to_s} error=#{v.to_s.inspect}"
        else
          message << " #{k}=#{v.inspect}"
        end
      }

      if @ignore_same_log_interval
        if ignore_same_log?(time, message)
          return nil, nil
        end
      elsif @ignore_repeated_log_interval
        if ignore_repeated_log?(:last_repeated_log, time, message)
          return nil, nil
        else
          Thread.current[:last_repeated_log] = CachedLog.new(message, time)
        end
      end

      if @log_event_enabled && !@threads_exclude_events.include?(Thread.current)
        record = map.dup
        record.keys.each {|key|
          record[key] = record[key].inspect unless record[key].respond_to?(:to_msgpack)
        }
        record['message'] = message.dup
        @engine.push_log_event("#{LOG_EVENT_TAG_PREFIX}.#{level}", Fluent::EventTime.from_time(time), record)
      end

      return time, message
    end

    def caller_line(type, time, depth, level)
      worker_id_part = if type == :default && (@process_type == :worker0 || @process_type == :workers)
                         @worker_id_part
                       else
                         "".freeze
                       end
      log_msg = "#{format_time(time)} [#{LEVEL_TEXT[level]}]: #{worker_id_part}"
      if @debug_mode
        line = caller(depth+1)[0]
        if match = /^(.+?):(\d+)(?::in `(.*)')?/.match(line)
          file = match[1].split('/')[-2,2].join('/')
          line = match[2]
          method = match[3]
          return "#{log_msg}#{file}:#{line}:#{method}: "
        end
      end
      return log_msg
    end

    def format_time(time)
      @time_formatter ? @time_formatter.exec(time) : time.strftime(@time_format)
    end
  end


  # PluginLogger has own log level separated from global $log object.
  # This class enables log_level option in each plugin.
  #
  # PluginLogger has same functionality as Log but some methods are forwarded to internal logger
  # for keeping logging action consistency in the process, e.g. color, event, etc.
  class PluginLogger < Log
    def initialize(logger)
      @logger = logger
      @level = @logger.level
      @format = nil
      @depth_offset = 2
      if logger.instance_variable_defined?(:@suppress_repeated_stacktrace)
        @suppress_repeated_stacktrace = logger.instance_variable_get(:@suppress_repeated_stacktrace)
      end
      if logger.instance_variable_defined?(:@ignore_repeated_log_interval)
        @ignore_repeated_log_interval = logger.instance_variable_get(:@ignore_repeated_log_interval)
      end
      if logger.instance_variable_defined?(:@ignore_same_log_interval)
        @ignore_same_log_interval = logger.instance_variable_get(:@ignore_same_log_interval)
      end

      self.format = @logger.format
      self.time_format = @logger.time_format
      enable_color @logger.enable_color?
    end

    def level=(log_level_str)
      @level = Log.str_to_level(log_level_str)
    end

    alias orig_format= format=
    alias orig_time_format= time_format=
    alias orig_enable_color enable_color

    def format=(fmt)
      self.orig_format = fmt
      @logger.format = fmt
    end

    def time_format=(fmt)
      self.orig_time_format = fmt
      @logger.time_format = fmt
    end

    def enable_color(b = true)
      orig_enable_color b
      @logger.enable_color b
    end

    extend Forwardable
    def_delegators '@logger', :get_worker_id, :enable_color?, :enable_debug, :enable_event,
      :disable_events, :log_event_enabled, :log_event_enabled=, :event, :caller_line, :puts, :write,
      :<<, :flush, :reset, :out, :out=, :optional_header, :optional_header=, :optional_attrs,
      :optional_attrs=
  end


  module PluginLoggerMixin
    def self.included(klass)
      klass.instance_eval {
        desc 'Allows the user to set different levels of logging for each plugin.'
        config_param :@log_level, :string, default: nil, alias: :log_level # 'log_level' will be warned as deprecated
      }
    end

    def initialize
      super

      @log = $log # Use $log object directly by default
    end

    attr_accessor :log

    def configure(conf)
      super

      if plugin_id_configured? || conf['@log_level']
        @log = PluginLogger.new($log.dup) unless @log.is_a?(PluginLogger)
        @log.optional_attrs = {}

        if level = conf['@log_level']
          @log.level = level
        end

        if plugin_id_configured?
          @log.optional_header = "[#{@id}] "
        end
      end
    end

    def terminate
      super
      @log.reset
    end
  end

  # This class delegates some methods which are used in `Fluent::Logger` to a instance variable(`dev`) in `Logger::LogDevice` class
  # https://github.com/ruby/ruby/blob/7b2d47132ff8ee950b0f978ab772dee868d9f1b0/lib/logger.rb#L661
  class LogDeviceIO < ::Logger::LogDevice
    def flush
      if @dev.respond_to?(:flush)
        @dev.flush
      else
        super
      end
    end

    def tty?
      if @dev.respond_to?(:tty?)
        @dev.tty?
      else
        super
      end
    end

    def sync=(v)
      if @dev.respond_to?(:sync=)
        @dev.sync = v
      else
        super
      end
    end

    def reopen(path, mode)
      if mode != 'a'
        raise "Unsupported mode: #{mode}"
      end
      super(path)
    end
  end
end