lib/fluent/compat/output.rb
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'fluent/plugin'
require 'fluent/plugin/buffer'
require 'fluent/plugin/output'
require 'fluent/plugin/bare_output'
require 'fluent/compat/call_super_mixin'
require 'fluent/compat/formatter_utils'
require 'fluent/compat/handle_tag_and_time_mixin'
require 'fluent/compat/parser_utils'
require 'fluent/compat/propagate_default'
require 'fluent/compat/record_filter_mixin'
require 'fluent/compat/output_chain'
require 'fluent/timezone'
require 'fluent/mixin'
require 'fluent/process'
require 'fluent/event'
require 'fluent/plugin_helper/compat_parameters'
require 'time'
module Fluent
module Compat
NULL_OUTPUT_CHAIN = NullOutputChain.instance
BufferQueueLimitError = ::Fluent::Plugin::Buffer::BufferOverflowError
module CompatOutputUtils
def self.buffer_section(conf)
conf.elements(name: 'buffer').first
end
def self.secondary_section(conf)
conf.elements(name: 'secondary').first
end
end
module BufferedEventStreamMixin
include Enumerable
def repeatable?
true
end
def each(&block)
msgpack_each(&block)
end
def to_msgpack_stream
read
end
def key
metadata.tag
end
end
module AddTimeSliceKeyToChunkMixin
def time_slice_format=(format)
@_time_slice_format = format
end
def timekey=(unit)
@_timekey = unit
end
def timezone=(tz)
@_timezone = tz
end
def assume_timekey!
@_formatter = Fluent::TimeFormatter.new(@_time_slice_format, nil, @_timezone)
return if self.metadata.timekey
if self.respond_to?(:path) && self.path =~ /\.(\d+)\.(?:b|q)(?:[a-z0-9]+)/
begin
self.metadata.timekey = Time.parse($1, @_time_slice_format).to_i
rescue ArgumentError
# unknown format / value as timekey
end
end
unless self.metadata.timekey
# file creation time is assumed in the time range of that time slice
# because the first record should be in that range.
time_int = self.created_at.to_i
self.metadata.timekey = time_int - (time_int % @_timekey)
end
end
def key
@_formatter.call(self.metadata.timekey)
end
end
module AddKeyToChunkMixin
def key
self.metadata.variables[:key]
end
end
module ChunkSizeCompatMixin
def size
self.bytesize
end
def size_of_events
@size + @adding_size
end
end
module BufferedChunkMixin
# prepend this module to BufferedOutput (including ObjectBufferedOutput) plugin singleton class
def write(chunk)
chunk.extend(ChunkSizeCompatMixin)
chunk.extend(ChunkMessagePackEventStreamer)
chunk.extend(AddKeyToChunkMixin) if chunk.metadata.variables && chunk.metadata.variables.has_key?(:key)
super
end
end
module TimeSliceChunkMixin
# prepend this module to TimeSlicedOutput plugin singleton class
def write(chunk)
chunk.extend(ChunkSizeCompatMixin)
chunk.extend(ChunkMessagePackEventStreamer)
chunk.extend(AddTimeSliceKeyToChunkMixin)
chunk.time_slice_format = @time_slice_format
chunk.timekey = @_timekey
chunk.timezone = @timezone
chunk.assume_timekey!
super
end
end
class Output < Fluent::Plugin::Output
# TODO: warn when deprecated
helpers_internal :event_emitter, :inject
def support_in_v12_style?(feature)
case feature
when :synchronous then true
when :buffered then false
when :delayed_commit then false
when :custom_format then false
end
end
def process(tag, es)
emit(tag, es, NULL_OUTPUT_CHAIN)
end
def initialize
super
unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin)
self.class.prepend Fluent::Compat::CallSuperMixin
end
end
def configure(conf)
ParserUtils.convert_parser_conf(conf)
FormatterUtils.convert_formatter_conf(conf)
super
end
def start
super
if instance_variable_defined?(:@formatter) && @inject_config
unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin)
if @formatter.respond_to?(:owner) && !@formatter.owner
@formatter.owner = self
@formatter.singleton_class.prepend FormatterUtils::InjectMixin
end
end
end
end
end
class MultiOutput < Fluent::Plugin::BareOutput
# TODO: warn when deprecated
helpers_internal :event_emitter
def process(tag, es)
emit(tag, es, NULL_OUTPUT_CHAIN)
end
end
class BufferedOutput < Fluent::Plugin::Output
# TODO: warn when deprecated
helpers_internal :event_emitter, :inject
def support_in_v12_style?(feature)
case feature
when :synchronous then false
when :buffered then true
when :delayed_commit then false
when :custom_format then true
end
end
desc 'The buffer type (memory, file)'
config_param :buffer_type, :string, default: 'memory'
desc 'The interval between data flushes.'
config_param :flush_interval, :time, default: 60
config_param :try_flush_interval, :float, default: 1
desc 'If true, the value of `retry_value` is ignored and there is no limit'
config_param :disable_retry_limit, :bool, default: false
desc 'The limit on the number of retries before buffered data is discarded'
config_param :retry_limit, :integer, default: 17
desc 'The initial intervals between write retries.'
config_param :retry_wait, :time, default: 1.0
desc 'The maximum intervals between write retries.'
config_param :max_retry_wait, :time, default: nil
desc 'The number of threads to flush the buffer.'
config_param :num_threads, :integer, default: 1
desc 'The interval between data flushes for queued chunk.'
config_param :queued_chunk_flush_interval, :time, default: 1
desc 'The size of each buffer chunk.'
config_param :buffer_chunk_limit, :size, default: 8*1024*1024
desc 'The length limit of the chunk queue.'
config_param :buffer_queue_limit, :integer, default: 256
desc 'The action when the size of buffer queue exceeds the buffer_queue_limit.'
config_param :buffer_queue_full_action, :enum, list: [:exception, :block, :drop_oldest_chunk], default: :exception
config_param :flush_at_shutdown, :bool, default: true
BUFFER_PARAMS = Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS
def self.propagate_default_params
BUFFER_PARAMS
end
include PropagateDefault
def configure(conf)
bufconf = CompatOutputUtils.buffer_section(conf)
config_style = (bufconf ? :v1 : :v0)
if config_style == :v0
buf_params = {
"flush_mode" => "interval",
"retry_type" => "exponential_backoff",
}
BUFFER_PARAMS.each do |older, newer|
next unless newer
if conf.has_key?(older)
if older == 'buffer_queue_full_action' && conf[older] == 'exception'
buf_params[newer] = 'throw_exception'
else
buf_params[newer] = conf[older]
end
end
end
conf.elements << Fluent::Config::Element.new('buffer', '', buf_params, [])
end
@includes_record_filter = self.class.ancestors.include?(Fluent::Compat::RecordFilterMixin)
methods_of_plugin = self.class.instance_methods(false)
@overrides_emit = methods_of_plugin.include?(:emit)
# RecordFilter mixin uses its own #format_stream method implementation
@overrides_format_stream = methods_of_plugin.include?(:format_stream) || @includes_record_filter
ParserUtils.convert_parser_conf(conf)
FormatterUtils.convert_formatter_conf(conf)
super
if config_style == :v1
unless @buffer_config.chunk_keys.empty?
raise Fluent::ConfigError, "this plugin '#{self.class}' cannot handle arguments for <buffer ...> section"
end
end
self.extend BufferedChunkMixin
if @overrides_emit
self.singleton_class.module_eval do
attr_accessor :last_emit_via_buffer
end
output_plugin = self
m = Module.new do
define_method(:emit) do |key, data, chain|
# receivers of this method are buffer instances
output_plugin.last_emit_via_buffer = [key, data]
end
end
@buffer.extend m
end
end
# original implementation of v0.12 BufferedOutput
def emit(tag, es, chain, key="")
# this method will not be used except for the case that plugin calls super
@emit_count_metrics.inc
data = format_stream(tag, es)
if @buffer.emit(key, data, chain)
submit_flush
end
end
def submit_flush
# nothing todo: blank method to be called from #emit of 3rd party plugins
end
def format_stream(tag, es)
# this method will not be used except for the case that plugin calls super
out = ''
es.each do |time, record|
out << format(tag, time, record)
end
out
end
# #format MUST be implemented in plugin
# #write is also
# This method overrides Fluent::Plugin::Output#handle_stream_simple
# because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn't consider about it
def handle_stream_simple(tag, es, enqueue: false)
if @overrides_emit
current_emit_count = @emit_count_metrics.get
size = es.size
key = data = nil
begin
emit(tag, es, NULL_OUTPUT_CHAIN)
key, data = self.last_emit_via_buffer
ensure
@emit_count_metrics.set(current_emit_count)
self.last_emit_via_buffer = nil
end
# on-the-fly key assignment can be done, and it's not configurable if Plugin#emit does it dynamically
meta = @buffer.metadata(variables: (key && !key.empty? ? {key: key} : nil))
write_guard do
@buffer.write({meta => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue)
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
return [meta]
end
if @overrides_format_stream
meta = metadata(nil, nil, nil)
size = es.size
bulk = format_stream(tag, es)
write_guard do
@buffer.write({meta => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue)
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
return [meta]
end
meta = metadata(nil, nil, nil)
size = es.size
data = es.map{|time,record| format(tag, time, record) }
write_guard do
@buffer.write({meta => data}, enqueue: enqueue)
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
[meta]
end
def extract_placeholders(str, metadata)
raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API"
end
def initialize
super
unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin)
self.class.prepend Fluent::Compat::CallSuperMixin
end
end
def start
super
if instance_variable_defined?(:@formatter) && @inject_config
unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin)
if @formatter.respond_to?(:owner) && !@formatter.owner
@formatter.owner = self
@formatter.singleton_class.prepend FormatterUtils::InjectMixin
end
end
end
end
def detach_process(&block)
log.warn "detach_process is not supported in this version. ignored."
block.call
end
def detach_multi_process(&block)
log.warn "detach_process is not supported in this version. ignored."
block.call
end
end
class ObjectBufferedOutput < Fluent::Plugin::Output
# TODO: warn when deprecated
helpers_internal :event_emitter, :inject
# This plugin cannot inherit BufferedOutput because #configure sets chunk_key 'tag'
# to flush chunks per tags, but BufferedOutput#configure doesn't allow setting chunk_key
# in v1 style configuration
def support_in_v12_style?(feature)
case feature
when :synchronous then false
when :buffered then true
when :delayed_commit then false
when :custom_format then false
end
end
desc 'The buffer type (memory, file)'
config_param :buffer_type, :string, default: 'memory'
desc 'The interval between data flushes.'
config_param :flush_interval, :time, default: 60
config_param :try_flush_interval, :float, default: 1
desc 'If true, the value of `retry_value` is ignored and there is no limit'
config_param :disable_retry_limit, :bool, default: false
desc 'The limit on the number of retries before buffered data is discarded'
config_param :retry_limit, :integer, default: 17
desc 'The initial intervals between write retries.'
config_param :retry_wait, :time, default: 1.0
desc 'The maximum intervals between write retries.'
config_param :max_retry_wait, :time, default: nil
desc 'The number of threads to flush the buffer.'
config_param :num_threads, :integer, default: 1
desc 'The interval between data flushes for queued chunk.'
config_param :queued_chunk_flush_interval, :time, default: 1
desc 'The size of each buffer chunk.'
config_param :buffer_chunk_limit, :size, default: 8*1024*1024
desc 'The length limit of the chunk queue.'
config_param :buffer_queue_limit, :integer, default: 256
desc 'The action when the size of buffer queue exceeds the buffer_queue_limit.'
config_param :buffer_queue_full_action, :enum, list: [:exception, :block, :drop_oldest_chunk], default: :exception
config_param :flush_at_shutdown, :bool, default: true
config_set_default :time_as_integer, true
BUFFER_PARAMS = Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS
def self.propagate_default_params
BUFFER_PARAMS
end
include PropagateDefault
def configure(conf)
bufconf = CompatOutputUtils.buffer_section(conf)
config_style = (bufconf ? :v1 : :v0)
if config_style == :v0
buf_params = {
"flush_mode" => "interval",
"retry_type" => "exponential_backoff",
}
BUFFER_PARAMS.each do |older, newer|
next unless newer
if conf.has_key?(older)
if older == 'buffer_queue_full_action' && conf[older] == 'exception'
buf_params[newer] = 'throw_exception'
else
buf_params[newer] = conf[older]
end
end
end
conf.elements << Fluent::Config::Element.new('buffer', 'tag', buf_params, [])
end
ParserUtils.convert_parser_conf(conf)
FormatterUtils.convert_formatter_conf(conf)
super
if config_style == :v1
if @buffer_config.chunk_keys == ['tag']
raise Fluent::ConfigError, "this plugin '#{self.class}' allows <buffer tag> only"
end
end
self.extend BufferedChunkMixin
end
def format_stream(tag, es) # for BufferedOutputTestDriver
if @compress == :gzip
es.to_compressed_msgpack_stream(time_int: @time_as_integer)
else
es.to_msgpack_stream(time_int: @time_as_integer)
end
end
def write(chunk)
write_objects(chunk.metadata.tag, chunk)
end
def extract_placeholders(str, metadata)
raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API"
end
def initialize
super
unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin)
self.class.prepend Fluent::Compat::CallSuperMixin
end
end
def start
super
if instance_variable_defined?(:@formatter) && @inject_config
unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin)
if @formatter.respond_to?(:owner) && !@formatter.owner
@formatter.owner = self
@formatter.singleton_class.prepend FormatterUtils::InjectMixin
end
end
end
end
def detach_process(&block)
log.warn "detach_process is not supported in this version. ignored."
block.call
end
def detach_multi_process(&block)
log.warn "detach_process is not supported in this version. ignored."
block.call
end
end
class TimeSlicedOutput < Fluent::Plugin::Output
# TODO: warn when deprecated
helpers_internal :event_emitter, :inject
def support_in_v12_style?(feature)
case feature
when :synchronous then false
when :buffered then true
when :delayed_commit then false
when :custom_format then true
end
end
desc 'The buffer type (memory, file)'
config_param :buffer_type, :string, default: 'file'
desc 'The interval between data flushes.'
config_param :flush_interval, :time, default: nil
config_param :try_flush_interval, :float, default: 1
desc 'If true, the value of `retry_value` is ignored and there is no limit'
config_param :disable_retry_limit, :bool, default: false
desc 'The limit on the number of retries before buffered data is discarded'
config_param :retry_limit, :integer, default: 17
desc 'The initial intervals between write retries.'
config_param :retry_wait, :time, default: 1.0
desc 'The maximum intervals between write retries.'
config_param :max_retry_wait, :time, default: nil
desc 'The number of threads to flush the buffer.'
config_param :num_threads, :integer, default: 1
desc 'The interval between data flushes for queued chunk.'
config_param :queued_chunk_flush_interval, :time, default: 1
desc 'The time format used as part of the file name.'
config_param :time_slice_format, :string, default: '%Y%m%d'
desc 'The amount of time Fluentd will wait for old logs to arrive.'
config_param :time_slice_wait, :time, default: 10*60
desc 'Parse the time value in the specified timezone'
config_param :timezone, :string, default: nil
desc 'The size of each buffer chunk.'
config_param :buffer_chunk_limit, :size, default: 256*1024*1024
desc 'The length limit of the chunk queue.'
config_param :buffer_queue_limit, :integer, default: 256
desc 'The action when the size of buffer queue exceeds the buffer_queue_limit.'
config_param :buffer_queue_full_action, :enum, list: [:exception, :block, :drop_oldest_chunk], default: :exception
config_param :flush_at_shutdown, :bool, default: false
attr_accessor :localtime
config_section :buffer do
config_set_default :@type, 'file'
end
BUFFER_PARAMS = Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS.merge(Fluent::PluginHelper::CompatParameters::BUFFER_TIME_SLICED_PARAMS)
def initialize
super
@localtime = true
unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin)
self.class.prepend Fluent::Compat::CallSuperMixin
end
end
def self.propagate_default_params
BUFFER_PARAMS
end
include PropagateDefault
def configure(conf)
bufconf = CompatOutputUtils.buffer_section(conf)
config_style = (bufconf ? :v1 : :v0)
if config_style == :v0
buf_params = {
"flush_mode" => (conf['flush_interval'] ? "interval" : "lazy"),
"retry_type" => "exponential_backoff",
}
BUFFER_PARAMS.each do |older, newer|
next unless newer
if conf.has_key?(older)
if older == 'buffer_queue_full_action' && conf[older] == 'exception'
buf_params[newer] = 'throw_exception'
else
buf_params[newer] = conf[older]
end
end
end
if conf['timezone']
Fluent::Timezone.validate!(conf['timezone'])
elsif conf['utc']
# v0.12 assumes UTC without any configuration
# 'localtime=false && no timezone key' means UTC
conf['localtime'] = "false"
conf.delete('utc')
elsif conf['localtime']
conf['timezone'] = Time.now.strftime('%z')
conf['localtime'] = "true"
else
# v0.12 assumes UTC without any configuration
# 'localtime=false && no timezone key' means UTC
conf['localtime'] = "false"
end
@_timekey = case conf['time_slice_format']
when /\%S/ then 1
when /\%M/ then 60
when /\%H/ then 3600
when /\%d/ then 86400
when nil then 86400 # default value of TimeSlicedOutput.time_slice_format is '%Y%m%d'
else
raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long"
end
buf_params["timekey"] = @_timekey
conf.elements << Fluent::Config::Element.new('buffer', 'time', buf_params, [])
end
ParserUtils.convert_parser_conf(conf)
FormatterUtils.convert_formatter_conf(conf)
super
if config_style == :v1
if @buffer_config.chunk_keys == ['tag']
raise Fluent::ConfigError, "this plugin '#{self.class}' allows <buffer tag> only"
end
end
self.extend TimeSliceChunkMixin
end
def start
super
if instance_variable_defined?(:@formatter) && @inject_config
unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin)
if @formatter.respond_to?(:owner) && !@formatter.owner
@formatter.owner = self
@formatter.singleton_class.prepend FormatterUtils::InjectMixin
end
end
end
end
def detach_process(&block)
log.warn "detach_process is not supported in this version. ignored."
block.call
end
def detach_multi_process(&block)
log.warn "detach_process is not supported in this version. ignored."
block.call
end
# Original TimeSlicedOutput#emit doesn't call #format_stream
# #format MUST be implemented in plugin
# #write is also
def extract_placeholders(str, metadata)
raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API"
end
end
end
end