lib/fluent/log.rb
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'forwardable'
require 'logger'
module Fluent
class Log
module TTYColor
RESET = "\033]R"
CRE = "\033[K"
CLEAR = "\033c"
NORMAL = "\033[0;39m"
RED = "\033[1;31m"
GREEN = "\033[1;32m"
YELLOW = "\033[1;33m"
BLUE = "\033[1;34m"
MAGENTA = "\033[1;35m"
CYAN = "\033[1;36m"
WHITE = "\033[1;37m"
end
LEVEL_TRACE = 0
LEVEL_DEBUG = 1
LEVEL_INFO = 2
LEVEL_WARN = 3
LEVEL_ERROR = 4
LEVEL_FATAL = 5
LEVEL_TEXT = %w(trace debug info warn error fatal)
LOG_EVENT_TAG_PREFIX = 'fluent'
LOG_EVENT_LABEL = '@FLUENT_LOG'
LOG_TYPE_SUPERVISOR = :supervisor # only in supervisor, or a worker with --no-supervisor
LOG_TYPE_WORKER0 = :worker0 # only in a worker with worker_id=0 (without showing worker id)
LOG_TYPE_DEFAULT = :default # show logs in all supervisor/workers, with worker id in workers (default)
LOG_TYPES = [LOG_TYPE_SUPERVISOR, LOG_TYPE_WORKER0, LOG_TYPE_DEFAULT].freeze
LOG_ROTATE_AGE = %w(daily weekly monthly)
IGNORE_SAME_LOG_MAX_CACHE_SIZE = 1000 # If need, make this an option of system config.
def self.str_to_level(log_level_str)
case log_level_str.downcase
when "trace" then LEVEL_TRACE
when "debug" then LEVEL_DEBUG
when "info" then LEVEL_INFO
when "warn" then LEVEL_WARN
when "error" then LEVEL_ERROR
when "fatal" then LEVEL_FATAL
else raise "Unknown log level: level = #{log_level_str}"
end
end
def self.event_tags
LEVEL_TEXT.map{|t| "#{LOG_EVENT_TAG_PREFIX}.#{t}" }
end
# Create a unique path for each process.
#
# >>> per_process_path("C:/tmp/test.log", :worker, 1)
# C:/tmp/test-1.log
# >>> per_process_path("C:/tmp/test.log", :supervisor, 0)
# C:/tmp/test-supervisor-0.log
def self.per_process_path(path, process_type, worker_id)
path = Pathname(path)
ext = path.extname
if process_type == :supervisor
suffix = "-#{process_type}-0#{ext}" # "-0" for backword compatibility.
else
suffix = "-#{worker_id}#{ext}"
end
return path.sub_ext(suffix).to_s
end
def initialize(logger, opts={})
# When ServerEngine changes the logger.level, the Fluentd logger level should also change.
# So overwrites logger.level= below.
# However, currently Fluentd doesn't use the ServerEngine's reloading feature,
# so maybe we don't need this overwriting anymore.
orig_logger_level_setter = logger.class.public_instance_method(:level=).bind(logger)
me = self
# The original ruby logger sets the number as each log level like below.
# DEBUG = 0
# INFO = 1
# WARN = 2
# ERROR = 3
# FATAL = 4
# Serverengine use this original log number. In addition to this, serverengine sets -1 as TRACE level.
# TRACE = -1
#
# On the other hand, in fluentd side, it sets the number like below.
# TRACE = 0
# DEBUG = 1
# INFO = 2
# WARN = 3
# ERROR = 4
# FATAL = 5
#
# Then fluentd's level is set as serverengine's level + 1.
# So if serverengine's logger level is changed, fluentd's log level will be changed to that + 1.
logger.define_singleton_method(:level=) {|level| orig_logger_level_setter.call(level); me.level = self.level + 1 }
@path = opts[:path]
@logger = logger
@out = logger.instance_variable_get(:@logdev)
@level = logger.level + 1
@debug_mode = false
@log_event_enabled = false
@depth_offset = 1
@format = nil
@time_format = nil
@formatter = nil
self.format = opts.fetch(:format, :text)
self.time_format = opts[:time_format] if opts.key?(:time_format)
enable_color out.tty?
# TODO: This variable name is unclear so we should change to better name.
@threads_exclude_events = []
# Fluent::Engine requires Fluent::Log, so we must take that object lazily
@engine = Fluent.const_get('Engine')
@optional_header = nil
@optional_attrs = nil
@suppress_repeated_stacktrace = opts[:suppress_repeated_stacktrace]
@ignore_repeated_log_interval = opts[:ignore_repeated_log_interval]
@ignore_same_log_interval = opts[:ignore_same_log_interval]
@process_type = opts[:process_type] # :supervisor, :worker0, :workers Or :standalone
@process_type ||= :standalone # to keep behavior of existing code
case @process_type
when :supervisor
@show_supervisor_log = true
@show_worker0_log = false
when :worker0
@show_supervisor_log = false
@show_worker0_log = true
when :workers
@show_supervisor_log = false
@show_worker0_log = false
when :standalone
@show_supervisor_log = true
@show_worker0_log = true
else
raise "BUG: unknown process type for logger:#{@process_type}"
end
@worker_id = opts[:worker_id]
@worker_id_part = "##{@worker_id} " # used only for :default log type in workers
end
def dup
dl_opts = {}
dl_opts[:log_level] = @level - 1
logger = ServerEngine::DaemonLogger.new(@out, dl_opts)
clone = self.class.new(logger, suppress_repeated_stacktrace: @suppress_repeated_stacktrace, process_type: @process_type,
worker_id: @worker_id, ignore_repeated_log_interval: @ignore_repeated_log_interval,
ignore_same_log_interval: @ignore_same_log_interval)
clone.format = @format
clone.time_format = @time_format
clone.log_event_enabled = @log_event_enabled
# optional headers/attrs are not copied, because new PluginLogger should have another one of it
clone
end
attr_reader :format
attr_reader :time_format
attr_accessor :log_event_enabled, :ignore_repeated_log_interval, :ignore_same_log_interval, :suppress_repeated_stacktrace
attr_accessor :out
# Strictly speaking, we should also change @logger.level when the setter of @level is called.
# Currently, we don't need to do it, since Fluentd::Log doesn't use ServerEngine::DaemonLogger.level.
# Since We overwrites logger.level= so that @logger.level is applied to @level,
# we need to find a good way to do this, otherwise we will end up in an endless loop.
attr_accessor :level
attr_accessor :optional_header, :optional_attrs
def logdev=(logdev)
@out = logdev
@logger.instance_variable_set(:@logdev, logdev)
nil
end
def format=(fmt)
return if @format == fmt
@time_format = '%Y-%m-%d %H:%M:%S %z'
@time_formatter = Strftime.new(@time_format) rescue nil
case fmt
when :text
@format = :text
@formatter = Proc.new { |type, time, level, msg|
r = caller_line(type, time, @depth_offset, level)
r << msg
r
}
when :json
@format = :json
@formatter = Proc.new { |type, time, level, msg|
r = {
'time' => format_time(time),
'level' => LEVEL_TEXT[level],
'message' => msg
}
if wid = get_worker_id(type)
r['worker_id'] = wid
end
Yajl.dump(r)
}
end
nil
end
def time_format=(time_fmt)
@time_format = time_fmt
@time_formatter = Strftime.new(@time_format) rescue nil
end
def stdout?
@out == $stdout
end
def reopen!
@out.reopen(@path, "a") if @path && @path != "-"
nil
end
def enable_debug(b=true)
@debug_mode = b
self
end
def enable_event(b=true)
@log_event_enabled = b
self
end
# If you want to suppress event emitting in specific thread, please use this method.
# Events in passed thread are never emitted.
def disable_events(thread)
# this method is not symmetric with #enable_event.
@threads_exclude_events.push(thread) unless @threads_exclude_events.include?(thread)
end
def enable_color?
!@color_reset.empty?
end
def enable_color(b=true)
if b
@color_trace = TTYColor::BLUE
@color_debug = TTYColor::WHITE
@color_info = TTYColor::GREEN
@color_warn = TTYColor::YELLOW
@color_error = TTYColor::MAGENTA
@color_fatal = TTYColor::RED
@color_reset = TTYColor::NORMAL
else
@color_trace = ''
@color_debug = ''
@color_info = ''
@color_warn = ''
@color_error = ''
@color_fatal = ''
@color_reset = ''
end
self
end
def log_type(args)
if LOG_TYPES.include?(args.first)
args.shift
else
LOG_TYPE_DEFAULT
end
end
# TODO: skip :worker0 logs when Fluentd gracefully restarted
def skipped_type?(type)
case type
when LOG_TYPE_DEFAULT
false
when LOG_TYPE_WORKER0
!@show_worker0_log
when LOG_TYPE_SUPERVISOR
!@show_supervisor_log
else
raise "BUG: unknown log type:#{type}"
end
end
def on_trace
return if @level > LEVEL_TRACE
yield
end
def trace(*args, &block)
return if @level > LEVEL_TRACE
type = log_type(args)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:trace, args)
return if time.nil?
puts [@color_trace, @formatter.call(type, time, LEVEL_TRACE, msg), @color_reset].join
rescue
# logger should not raise an exception. This rescue prevents unexpected behaviour.
end
alias TRACE trace
def trace_backtrace(backtrace=$!.backtrace, type: :default)
dump_stacktrace(type, backtrace, LEVEL_TRACE)
end
def on_debug
return if @level > LEVEL_DEBUG
yield
end
def debug(*args, &block)
return if @level > LEVEL_DEBUG
type = log_type(args)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:debug, args)
return if time.nil?
puts [@color_debug, @formatter.call(type, time, LEVEL_DEBUG, msg), @color_reset].join
rescue
end
alias DEBUG debug
def debug_backtrace(backtrace=$!.backtrace, type: :default)
dump_stacktrace(type, backtrace, LEVEL_DEBUG)
end
def on_info
return if @level > LEVEL_INFO
yield
end
def info(*args, &block)
return if @level > LEVEL_INFO
type = log_type(args)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:info, args)
return if time.nil?
puts [@color_info, @formatter.call(type, time, LEVEL_INFO, msg), @color_reset].join
rescue
end
alias INFO info
def info_backtrace(backtrace=$!.backtrace, type: :default)
dump_stacktrace(type, backtrace, LEVEL_INFO)
end
def on_warn
return if @level > LEVEL_WARN
yield
end
def warn(*args, &block)
return if @level > LEVEL_WARN
type = log_type(args)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:warn, args)
return if time.nil?
puts [@color_warn, @formatter.call(type, time, LEVEL_WARN, msg), @color_reset].join
rescue
end
alias WARN warn
def warn_backtrace(backtrace=$!.backtrace, type: :default)
dump_stacktrace(type, backtrace, LEVEL_WARN)
end
def on_error
return if @level > LEVEL_ERROR
yield
end
def error(*args, &block)
return if @level > LEVEL_ERROR
type = log_type(args)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:error, args)
return if time.nil?
puts [@color_error, @formatter.call(type, time, LEVEL_ERROR, msg), @color_reset].join
rescue
end
alias ERROR error
def error_backtrace(backtrace=$!.backtrace, type: :default)
dump_stacktrace(type, backtrace, LEVEL_ERROR)
end
def on_fatal
return if @level > LEVEL_FATAL
yield
end
def fatal(*args, &block)
return if @level > LEVEL_FATAL
type = log_type(args)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:fatal, args)
return if time.nil?
puts [@color_fatal, @formatter.call(type, time, LEVEL_FATAL, msg), @color_reset].join
rescue
end
alias FATAL fatal
def fatal_backtrace(backtrace=$!.backtrace, type: :default)
dump_stacktrace(type, backtrace, LEVEL_FATAL)
end
def puts(msg)
@logger << msg + "\n"
@out.flush
msg
rescue
# FIXME
nil
end
def write(data)
@out.write(data)
end
# We need `#<<` method to use this logger class with other
# libraries such as aws-sdk
alias << write
def flush
@out.flush
end
def reset
@out.reset if @out.respond_to?(:reset)
end
CachedLog = Struct.new(:msg, :time)
def ignore_repeated_log?(key, time, message)
cached_log = Thread.current[key]
return false if cached_log.nil?
(cached_log.msg == message) && (time - cached_log.time <= @ignore_repeated_log_interval)
end
def ignore_same_log?(time, message)
cached_log = Thread.current[:last_same_log]
if cached_log.nil?
Thread.current[:last_same_log] = {message => time}
return false
end
prev_time = cached_log[message]
if prev_time
if (time - prev_time) <= @ignore_same_log_interval
true
else
cached_log[message] = time
false
end
else
if cached_log.size >= IGNORE_SAME_LOG_MAX_CACHE_SIZE
cached_log.reject! do |_, cached_time|
(time - cached_time) > @ignore_same_log_interval
end
end
# If the size is still over, we have no choice but to clear it.
cached_log.clear if cached_log.size >= IGNORE_SAME_LOG_MAX_CACHE_SIZE
cached_log[message] = time
false
end
end
def suppress_stacktrace?(backtrace)
cached_log = Thread.current[:last_repeated_stacktrace]
return false if cached_log.nil?
cached_log.msg == backtrace
end
def dump_stacktrace(type, backtrace, level)
return if @level > level
time = Time.now
if @format == :text
line = caller_line(type, time, 5, level)
if @ignore_repeated_log_interval && ignore_repeated_log?(:last_repeated_stacktrace, time, backtrace)
return
elsif @suppress_repeated_stacktrace && suppress_stacktrace?(backtrace)
puts [" ", line, 'suppressed same stacktrace'].join
Thread.current[:last_repeated_stacktrace] = CachedLog.new(backtrace, time) if @ignore_repeated_log_interval
else
backtrace.each { |msg|
puts [" ", line, msg].join
}
Thread.current[:last_repeated_stacktrace] = CachedLog.new(backtrace, time) if @suppress_repeated_stacktrace
end
else
r = {
'time' => format_time(time),
'level' => LEVEL_TEXT[level],
}
if wid = get_worker_id(type)
r['worker_id'] = wid
end
if @ignore_repeated_log_interval && ignore_repeated_log?(:last_repeated_stacktrace, time, backtrace)
return
elsif @suppress_repeated_stacktrace && suppress_stacktrace?(backtrace)
r['message'] = 'suppressed same stacktrace'
Thread.current[:last_repeated_stacktrace] = CachedLog.new(backtrace, time) if @ignore_repeated_log_interval
else
r['message'] = backtrace.join("\n")
Thread.current[:last_repeated_stacktrace] = CachedLog.new(backtrace, time) if @suppress_repeated_stacktrace
end
puts Yajl.dump(r)
end
nil
end
def get_worker_id(type)
if type == :default && (@process_type == :worker0 || @process_type == :workers)
@worker_id
else
nil
end
end
def event(level, args)
time = Time.now
message = @optional_header ? @optional_header.dup : ''
map = @optional_attrs ? @optional_attrs.dup : {}
args.each {|a|
if a.is_a?(Hash)
a.each_pair {|k,v|
map[k.to_s] = v
}
else
message << a.to_s
end
}
map.each_pair {|k,v|
if k == "error".freeze && v.is_a?(Exception) && !map.has_key?("error_class")
message << " error_class=#{v.class.to_s} error=#{v.to_s.inspect}"
else
message << " #{k}=#{v.inspect}"
end
}
if @ignore_same_log_interval
if ignore_same_log?(time, message)
return nil, nil
end
elsif @ignore_repeated_log_interval
if ignore_repeated_log?(:last_repeated_log, time, message)
return nil, nil
else
Thread.current[:last_repeated_log] = CachedLog.new(message, time)
end
end
if @log_event_enabled && !@threads_exclude_events.include?(Thread.current)
record = map.dup
record.keys.each {|key|
record[key] = record[key].inspect unless record[key].respond_to?(:to_msgpack)
}
record['message'] = message.dup
@engine.push_log_event("#{LOG_EVENT_TAG_PREFIX}.#{level}", Fluent::EventTime.from_time(time), record)
end
return time, message
end
def caller_line(type, time, depth, level)
worker_id_part = if type == :default && (@process_type == :worker0 || @process_type == :workers)
@worker_id_part
else
"".freeze
end
log_msg = "#{format_time(time)} [#{LEVEL_TEXT[level]}]: #{worker_id_part}"
if @debug_mode
line = caller(depth+1)[0]
if match = /^(.+?):(\d+)(?::in `(.*)')?/.match(line)
file = match[1].split('/')[-2,2].join('/')
line = match[2]
method = match[3]
return "#{log_msg}#{file}:#{line}:#{method}: "
end
end
return log_msg
end
def format_time(time)
@time_formatter ? @time_formatter.exec(time) : time.strftime(@time_format)
end
end
# PluginLogger has own log level separated from global $log object.
# This class enables log_level option in each plugin.
#
# PluginLogger has same functionality as Log but some methods are forwarded to internal logger
# for keeping logging action consistency in the process, e.g. color, event, etc.
class PluginLogger < Log
def initialize(logger)
@logger = logger
@level = @logger.level
@format = nil
@depth_offset = 2
if logger.instance_variable_defined?(:@suppress_repeated_stacktrace)
@suppress_repeated_stacktrace = logger.instance_variable_get(:@suppress_repeated_stacktrace)
end
if logger.instance_variable_defined?(:@ignore_repeated_log_interval)
@ignore_repeated_log_interval = logger.instance_variable_get(:@ignore_repeated_log_interval)
end
if logger.instance_variable_defined?(:@ignore_same_log_interval)
@ignore_same_log_interval = logger.instance_variable_get(:@ignore_same_log_interval)
end
self.format = @logger.format
self.time_format = @logger.time_format
enable_color @logger.enable_color?
end
def level=(log_level_str)
@level = Log.str_to_level(log_level_str)
end
alias orig_format= format=
alias orig_time_format= time_format=
alias orig_enable_color enable_color
def format=(fmt)
self.orig_format = fmt
@logger.format = fmt
end
def time_format=(fmt)
self.orig_time_format = fmt
@logger.time_format = fmt
end
def enable_color(b = true)
orig_enable_color b
@logger.enable_color b
end
extend Forwardable
def_delegators '@logger', :get_worker_id, :enable_color?, :enable_debug, :enable_event,
:disable_events, :log_event_enabled, :log_event_enabled=, :event, :caller_line, :puts, :write,
:<<, :flush, :reset, :out, :out=, :optional_header, :optional_header=, :optional_attrs,
:optional_attrs=
end
module PluginLoggerMixin
def self.included(klass)
klass.instance_eval {
desc 'Allows the user to set different levels of logging for each plugin.'
config_param :@log_level, :string, default: nil, alias: :log_level # 'log_level' will be warned as deprecated
}
end
def initialize
super
@log = $log # Use $log object directly by default
end
attr_accessor :log
def configure(conf)
super
if plugin_id_configured? || conf['@log_level']
@log = PluginLogger.new($log.dup) unless @log.is_a?(PluginLogger)
@log.optional_attrs = {}
if level = conf['@log_level']
@log.level = level
end
if plugin_id_configured?
@log.optional_header = "[#{@id}] "
end
end
end
def terminate
super
@log.reset
end
end
# This class delegates some methods which are used in `Fluent::Logger` to a instance variable(`dev`) in `Logger::LogDevice` class
# https://github.com/ruby/ruby/blob/7b2d47132ff8ee950b0f978ab772dee868d9f1b0/lib/logger.rb#L661
class LogDeviceIO < ::Logger::LogDevice
def flush
if @dev.respond_to?(:flush)
@dev.flush
else
super
end
end
def tty?
if @dev.respond_to?(:tty?)
@dev.tty?
else
super
end
end
def sync=(v)
if @dev.respond_to?(:sync=)
@dev.sync = v
else
super
end
end
def reopen(path, mode)
if mode != 'a'
raise "Unsupported mode: #{mode}"
end
super(path)
end
end
end