lib/fluent/plugin/out_file.rb
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'fileutils'
require 'zlib'
require 'time'
require 'fluent/plugin/output'
require 'fluent/config/error'
# TODO remove ...
require 'fluent/plugin/file_util'
module Fluent::Plugin
class FileOutput < Output
Fluent::Plugin.register_output('file', self)
helpers :formatter, :inject, :compat_parameters
SUPPORTED_COMPRESS = [:text, :gz, :gzip]
SUPPORTED_COMPRESS_MAP = {
text: nil,
gz: :gzip,
gzip: :gzip,
}
DEFAULT_TIMEKEY = 60 * 60 * 24
desc "The Path of the file."
config_param :path, :string
desc "Specify to add file suffix for bare file path or not."
config_param :add_path_suffix, :bool, default: true
desc "The file suffix added to bare file path."
config_param :path_suffix, :string, default: '.log'
desc "The flushed chunk is appended to existence file or not."
config_param :append, :bool, default: false
desc "Compress flushed file."
config_param :compress, :enum, list: SUPPORTED_COMPRESS, default: :text
desc "Execute compression again even when buffer chunk is already compressed."
config_param :recompress, :bool, default: false
desc "Create symlink to temporary buffered file when buffer_type is file (disabled on Windows)."
config_param :symlink_path, :string, default: nil
config_section :format do
config_set_default :@type, 'out_file'
end
config_section :buffer do
config_set_default :@type, 'file'
config_set_default :chunk_keys, ['time']
config_set_default :timekey, DEFAULT_TIMEKEY
end
attr_reader :dir_perm
attr_accessor :last_written_path # for tests
module SymlinkBufferMixin
def metadata(timekey: nil, tag: nil, variables: nil)
metadata = super
@latest_metadata ||= new_metadata(timekey: 0)
if metadata.timekey && (metadata.timekey >= @latest_metadata.timekey)
@latest_metadata = metadata
end
metadata
end
def output_plugin_for_symlink=(output_plugin)
@_output_plugin_for_symlink = output_plugin
end
def symlink_path=(path)
@_symlink_path = path
end
def generate_chunk(metadata)
chunk = super
# "symlink" feature is to link from symlink_path to the latest file chunk. Records with latest
# timekey will be appended into that file chunk. On the other side, resumed file chunks might NOT
# have timekey, especially in the cases that resumed file chunks are generated by Fluentd v0.12.
# These chunks will be enqueued immediately, and will be flushed soon.
if chunk.metadata == @latest_metadata
sym_path = @_output_plugin_for_symlink.extract_placeholders(@_symlink_path, chunk)
FileUtils.mkdir_p(File.dirname(sym_path), mode: @_output_plugin_for_symlink.dir_perm)
FileUtils.ln_sf(chunk.path, sym_path)
end
chunk
end
end
def configure(conf)
compat_parameters_convert(conf, :formatter, :buffer, :inject, default_chunk_key: "time")
configured_time_slice_format = conf['time_slice_format']
if conf.elements(name: 'buffer').empty?
conf.add_element('buffer', 'time')
end
buffer_conf = conf.elements(name: 'buffer').first
# Fluent::PluginId#configure is not called yet, so we can't use #plugin_root_dir here.
if !buffer_conf.has_key?('path') && !(conf['@id'] && system_config.root_dir)
# v0.14 file buffer handles path as directory if '*' is missing
# 'dummy_path' is not to raise configuration error for 'path' in file buffer plugin,
# but raise it in this plugin.
buffer_conf['path'] = conf['path'] || '/tmp/dummy_path'
end
if conf.has_key?('utc') || conf.has_key?('localtime')
param_name = conf.has_key?('utc') ? 'utc' : 'localtime'
log.warn "'#{param_name}' is deprecated for output plugin. This parameter is used for formatter plugin in compatibility layer. If you want to use same feature, use timekey_use_utc parameter in <buffer> directive instead"
end
super
@compress_method = SUPPORTED_COMPRESS_MAP[@compress]
if @path.include?('*') && !@buffer_config.timekey
raise Fluent::ConfigError, "path including '*' must be used with buffer chunk key 'time'"
end
path_suffix = @add_path_suffix ? @path_suffix : ''
path_timekey = if @chunk_key_time
@as_secondary ? @primary_instance.buffer_config.timekey : @buffer_config.timekey
else
nil
end
@path_template = generate_path_template(@path, path_timekey, @append, @compress_method, path_suffix: path_suffix, time_slice_format: configured_time_slice_format)
if @as_secondary
# When this plugin is configured as secondary & primary plugin has tag key, but this plugin may not have it.
# Increment placeholder can make another output file per chunk tag/keys even if original path doesn't include it.
placeholder_validators(:path, @path_template).select{|v| v.type == :time }.each do |v|
v.validate!
end
else
placeholder_validate!(:path, @path_template)
max_tag_index = get_placeholders_tag(@path_template).max || 1
max_tag_index = 1 if max_tag_index < 1
dummy_tag = (['a'] * max_tag_index).join('.')
dummy_record_keys = get_placeholders_keys(@path_template) || ['message']
dummy_record = Hash[dummy_record_keys.zip(['data'] * dummy_record_keys.size)]
test_chunk1 = chunk_for_test(dummy_tag, Fluent::EventTime.now, dummy_record)
test_path = extract_placeholders(@path_template, test_chunk1)
unless ::Fluent::FileUtil.writable_p?(test_path)
raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable"
end
end
@formatter = formatter_create
if @symlink_path && @buffer.respond_to?(:path)
if @as_secondary
raise Fluent::ConfigError, "symlink_path option is unavailable in <secondary>: consider to use secondary_file plugin"
end
if Fluent.windows?
log.warn "symlink_path is unavailable on Windows platform. disabled."
@symlink_path = nil
else
@buffer.extend SymlinkBufferMixin
@buffer.symlink_path = @symlink_path
@buffer.output_plugin_for_symlink = self
end
end
@dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION
@file_perm = system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION
@need_lock = system_config.workers > 1
# https://github.com/fluent/fluentd/issues/3569
@need_ruby_on_macos_workaround = false
if @append && Fluent.macos?
condition = Gem::Dependency.new('', [">= 2.7.0", "< 3.1.0"])
@need_ruby_on_macos_workaround = true if condition.match?('', RUBY_VERSION)
end
end
def multi_workers_ready?
true
end
def format(tag, time, record)
r = inject_values_to_record(tag, time, record)
@formatter.format(tag, time, r)
end
def write(chunk)
path = extract_placeholders(@path_template, chunk)
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm
writer = case
when @compress_method.nil?
method(:write_without_compression)
when @compress_method == :gzip
if @buffer.compress != :gzip || @recompress
method(:write_gzip_with_compression)
else
method(:write_gzip_from_gzipped_chunk)
end
else
raise "BUG: unknown compression method #{@compress_method}"
end
if @append
if @need_lock
acquire_worker_lock(path) do
writer.call(path, chunk)
end
else
writer.call(path, chunk)
end
else
find_filepath_available(path, with_lock: @need_lock) do |actual_path|
writer.call(actual_path, chunk)
path = actual_path
end
end
@last_written_path = path
end
def write_without_compression(path, chunk)
File.open(path, "ab", @file_perm) do |f|
if @need_ruby_on_macos_workaround
content = chunk.read()
f.puts content
else
chunk.write_to(f)
end
end
end
def write_gzip_with_compression(path, chunk)
File.open(path, "ab", @file_perm) do |f|
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz, compressed: :text)
gz.close
end
end
def write_gzip_from_gzipped_chunk(path, chunk)
File.open(path, "ab", @file_perm) do |f|
chunk.write_to(f, compressed: :gzip)
end
end
def timekey_to_timeformat(timekey)
case timekey
when nil then ''
when 0...60 then '%Y%m%d%H%M%S' # 60 exclusive
when 60...3600 then '%Y%m%d%H%M'
when 3600...86400 then '%Y%m%d%H'
else '%Y%m%d'
end
end
def compression_suffix(compress)
case compress
when :gzip then '.gz'
when nil then ''
else
raise ArgumentError, "unknown compression type #{compress}"
end
end
# /path/to/dir/file.* -> /path/to/dir/file.%Y%m%d
# /path/to/dir/file.*.data -> /path/to/dir/file.%Y%m%d.data
# /path/to/dir/file -> /path/to/dir/file.%Y%m%d.log
# %Y%m%d -> %Y%m%d_** (non append)
# + .gz (gzipped)
## TODO: remove time_slice_format when end of support of compat_parameters
def generate_path_template(original, timekey, append, compress, path_suffix: '', time_slice_format: nil)
comp_suffix = compression_suffix(compress)
index_placeholder = append ? '' : '_**'
if original.index('*')
raise "BUG: configuration error must be raised for path including '*' without timekey" unless timekey
time_placeholders_part = time_slice_format || timekey_to_timeformat(timekey)
original.gsub('*', time_placeholders_part + index_placeholder) + comp_suffix
else
if timekey
if time_slice_format
"#{original}.#{time_slice_format}#{index_placeholder}#{path_suffix}#{comp_suffix}"
else
time_placeholders = timekey_to_timeformat(timekey)
if time_placeholders.scan(/../).any?{|ph| original.include?(ph) }
raise Fluent::ConfigError, "insufficient timestamp placeholders in path" if time_placeholders.scan(/../).any?{|ph| !original.include?(ph) }
"#{original}#{index_placeholder}#{path_suffix}#{comp_suffix}"
else
"#{original}.#{time_placeholders}#{index_placeholder}#{path_suffix}#{comp_suffix}"
end
end
else
"#{original}#{index_placeholder}#{path_suffix}#{comp_suffix}"
end
end
end
def find_filepath_available(path_with_placeholder, with_lock: false) # for non-append
raise "BUG: index placeholder not found in path: #{path_with_placeholder}" unless path_with_placeholder.index('_**')
i = 0
dir_path = locked = nil
while true
path = path_with_placeholder.sub('_**', "_#{i}")
i += 1
next if File.exist?(path)
if with_lock
dir_path = path + '.lock'
locked = Dir.mkdir(dir_path) rescue false
next unless locked
# ensure that other worker doesn't create a file (and release lock)
# between previous File.exist? and Dir.mkdir
next if File.exist?(path)
end
break
end
yield path
ensure
if dir_path && locked && Dir.exist?(dir_path)
Dir.rmdir(dir_path) rescue nil
end
end
end
end