lib/fluent/plugin/in_tail/position_file.rb
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'fluent/plugin/input'
module Fluent::Plugin
class TailInput < Fluent::Plugin::Input
class PositionFile
UNWATCHED_POSITION = 0xffffffffffffffff
POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze
def self.load(file, follow_inodes, existing_targets, logger:)
pf = new(file, follow_inodes, logger: logger)
pf.load(existing_targets)
pf
end
def initialize(file, follow_inodes, logger: nil)
@file = file
@logger = logger
@file_mutex = Mutex.new
@map = {}
@follow_inodes = follow_inodes
end
def [](target_info)
if m = @map[@follow_inodes ? target_info.ino : target_info.path]
return m
end
@file_mutex.synchronize {
@file.seek(0, IO::SEEK_END)
seek = @file.pos + target_info.path.bytesize + 1
@file.write "#{target_info.path}\t0000000000000000\t0000000000000000\n"
if @follow_inodes
@map[target_info.ino] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
else
@map[target_info.path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
end
}
end
def unwatch_removed_targets(existing_targets)
@map.reject { |key, entry|
existing_targets.key?(key)
}.each_key { |key|
unwatch_key(key)
}
end
def unwatch(target_info)
unwatch_key(@follow_inodes ? target_info.ino : target_info.path)
end
def load(existing_targets = nil)
compact(existing_targets)
map = {}
@file_mutex.synchronize do
@file.pos = 0
@file.each_line do |line|
m = POSITION_FILE_ENTRY_REGEX.match(line)
next if m.nil?
path = m[1]
pos = m[2].to_i(16)
ino = m[3].to_i(16)
seek = @file.pos - line.bytesize + path.bytesize + 1
if @follow_inodes
map[ino] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
else
map[path] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
end
end
end
@map = map
end
# This method is similer to #compact but it tries to get less lock to avoid a lock contention
def try_compact
last_modified = nil
size = nil
@file_mutex.synchronize do
size = @file.size
last_modified = @file.mtime
end
entries = fetch_compacted_entries
@logger&.debug "Compacted entries: ", entries.keys
@file_mutex.synchronize do
if last_modified == @file.mtime && size == @file.size
@file.pos = 0
@file.truncate(0)
@file.write(entries.values.map(&:to_entry_fmt).join)
# entry contains path/ino key and value.
entries.each do |key, val|
if (m = @map[key])
m.seek = val.seek
end
end
else
# skip
end
end
end
private
def unwatch_key(key)
if (entry = @map.delete(key))
entry.update_pos(UNWATCHED_POSITION)
end
end
def compact(existing_targets = nil)
@file_mutex.synchronize do
entries = fetch_compacted_entries
@logger&.debug "Compacted entries: ", entries.keys
if existing_targets
entries = remove_deleted_files_entries(entries, existing_targets)
@logger&.debug "Remove missing entries.",
existing_targets: existing_targets.keys,
entries_after_removing: entries.keys
end
@file.pos = 0
@file.truncate(0)
@file.write(entries.values.map(&:to_entry_fmt).join)
end
end
def fetch_compacted_entries
entries = {}
@file.pos = 0
file_pos = 0
@file.each_line do |line|
m = POSITION_FILE_ENTRY_REGEX.match(line)
if m.nil?
@logger.warn "Unparsable line in pos_file: #{line}" if @logger
next
end
path = m[1]
pos = m[2].to_i(16)
ino = m[3].to_i(16)
if pos == UNWATCHED_POSITION
@logger.debug "Remove unwatched line from pos_file: #{line}" if @logger
else
if @follow_inodes
@logger&.warn("#{path} (inode: #{ino}) already exists. use latest one: deleted #{entries[ino]}") if entries.include?(ino)
entries[ino] = Entry.new(path, pos, ino, file_pos + path.bytesize + 1)
else
@logger&.warn("#{path} already exists. use latest one: deleted #{entries[path]}") if entries.include?(path)
entries[path] = Entry.new(path, pos, ino, file_pos + path.bytesize + 1)
end
file_pos += line.size
end
end
entries
end
def remove_deleted_files_entries(existent_entries, existing_targets)
existent_entries.select { |path_or_ino|
existing_targets.key?(path_or_ino)
}
end
end
Entry = Struct.new(:path, :pos, :ino, :seek) do
POSITION_FILE_ENTRY_FORMAT = "%s\t%016x\t%016x\n".freeze
def to_entry_fmt
POSITION_FILE_ENTRY_FORMAT % [path, pos, ino]
end
end
# pos inode
# ffffffffffffffff\tffffffffffffffff\n
class FilePositionEntry
POS_SIZE = 16
INO_OFFSET = 17
INO_SIZE = 16
LN_OFFSET = 33
SIZE = 34
def initialize(file, file_mutex, seek, pos, inode)
@file = file
@file_mutex = file_mutex
@seek = seek
@pos = pos
@inode = inode
end
attr_writer :seek
def update(ino, pos)
@file_mutex.synchronize {
@file.pos = @seek
@file.write "%016x\t%016x" % [pos, ino]
}
@pos = pos
@inode = ino
end
def update_pos(pos)
@file_mutex.synchronize {
@file.pos = @seek
@file.write "%016x" % pos
}
@pos = pos
end
def read_inode
@inode
end
def read_pos
@pos
end
end
class MemoryPositionEntry
def initialize
@pos = 0
@inode = 0
end
def update(ino, pos)
@inode = ino
@pos = pos
end
def update_pos(pos)
@pos = pos
end
def read_pos
@pos
end
def read_inode
@inode
end
end
TargetInfo = Struct.new(:path, :ino)
end
end