lib/fluent/plugin/filter_record_transformer.rb
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'socket'
require 'json'
require 'ostruct'
require 'fluent/plugin/filter'
require 'fluent/config/error'
require 'fluent/event'
require 'fluent/time'
module Fluent::Plugin
class RecordTransformerFilter < Fluent::Plugin::Filter
Fluent::Plugin.register_filter('record_transformer', self)
helpers :record_accessor
desc 'A comma-delimited list of keys to delete.'
config_param :remove_keys, :array, default: nil
desc 'A comma-delimited list of keys to keep.'
config_param :keep_keys, :array, default: nil
desc 'Create new Hash to transform incoming data'
config_param :renew_record, :bool, default: false
desc 'Specify field name of the record to overwrite the time of events. Its value must be unix time.'
config_param :renew_time_key, :string, default: nil
desc 'When set to true, the full Ruby syntax is enabled in the ${...} expression.'
config_param :enable_ruby, :bool, default: false
desc 'Use original value type.'
config_param :auto_typecast, :bool, default: true
def configure(conf)
super
map = {}
# <record></record> directive
conf.elements.select { |element| element.name == 'record' }.each do |element|
element.each_pair do |k, v|
element.has_key?(k) # to suppress unread configuration warning
map[k] = parse_value(v)
end
end
if @keep_keys
raise Fluent::ConfigError, "`renew_record` must be true to use `keep_keys`" unless @renew_record
end
@key_deleters = if @remove_keys
@remove_keys.map { |k| record_accessor_create(k) }
end
placeholder_expander_params = {
log: log,
auto_typecast: @auto_typecast,
}
@placeholder_expander =
if @enable_ruby
# require utilities which would be used in ruby placeholders
require 'pathname'
require 'uri'
require 'cgi'
RubyPlaceholderExpander.new(placeholder_expander_params)
else
PlaceholderExpander.new(placeholder_expander_params)
end
@map = @placeholder_expander.preprocess_map(map)
@hostname = Socket.gethostname
end
def filter_stream(tag, es)
new_es = Fluent::MultiEventStream.new
tag_parts = tag.split('.')
tag_prefix = tag_prefix(tag_parts)
tag_suffix = tag_suffix(tag_parts)
placeholder_values = {
'tag' => tag,
'tag_parts' => tag_parts,
'tag_prefix' => tag_prefix,
'tag_suffix' => tag_suffix,
'hostname' => @hostname,
}
es.each do |time, record|
begin
placeholder_values['time'] = @placeholder_expander.time_value(time)
placeholder_values['record'] = record
new_record = reform(record, placeholder_values)
if @renew_time_key && new_record.has_key?(@renew_time_key)
time = Fluent::EventTime.from_time(Time.at(new_record[@renew_time_key].to_f))
end
@key_deleters.each { |deleter| deleter.delete(new_record) } if @key_deleters
new_es.add(time, new_record)
rescue => e
router.emit_error_event(tag, time, record, e)
log.debug { "map:#{@map} record:#{record} placeholder_values:#{placeholder_values}" }
end
end
new_es
end
private
def parse_value(value_str)
if value_str.start_with?('{', '[')
JSON.parse(value_str)
else
value_str
end
rescue => e
log.warn "failed to parse #{value_str} as json. Assuming #{value_str} is a string", error: e
value_str # emit as string
end
def reform(record, placeholder_values)
placeholders = @placeholder_expander.prepare_placeholders(placeholder_values)
new_record = @renew_record ? {} : record.dup
@keep_keys.each do |k|
new_record[k] = record[k] if record.has_key?(k)
end if @keep_keys && @renew_record
new_record.merge!(expand_placeholders(@map, placeholders))
new_record
end
def expand_placeholders(value, placeholders)
if value.is_a?(String)
new_value = @placeholder_expander.expand(value, placeholders)
elsif value.is_a?(Hash)
new_value = {}
value.each_pair do |k, v|
new_key = @placeholder_expander.expand(k, placeholders, true)
new_value[new_key] = expand_placeholders(v, placeholders)
end
elsif value.is_a?(Array)
new_value = []
value.each_with_index do |v, i|
new_value[i] = expand_placeholders(v, placeholders)
end
else
new_value = value
end
new_value
end
def tag_prefix(tag_parts)
return [] if tag_parts.empty?
tag_prefix = [tag_parts.first]
1.upto(tag_parts.size-1).each do |i|
tag_prefix[i] = "#{tag_prefix[i-1]}.#{tag_parts[i]}"
end
tag_prefix
end
def tag_suffix(tag_parts)
return [] if tag_parts.empty?
rev_tag_parts = tag_parts.reverse
rev_tag_suffix = [rev_tag_parts.first]
1.upto(tag_parts.size-1).each do |i|
rev_tag_suffix[i] = "#{rev_tag_parts[i]}.#{rev_tag_suffix[i-1]}"
end
rev_tag_suffix.reverse!
end
# THIS CLASS MUST BE THREAD-SAFE
class PlaceholderExpander
attr_reader :placeholders, :log
def initialize(params)
@log = params[:log]
@auto_typecast = params[:auto_typecast]
end
def time_value(time)
Time.at(time).to_s
end
def preprocess_map(value, force_stringify = false)
value
end
def prepare_placeholders(placeholder_values)
placeholders = {}
placeholder_values.each do |key, value|
if value.kind_of?(Array) # tag_parts, etc
size = value.size
value.each_with_index do |v, idx|
placeholders.store("${#{key}[#{idx}]}", v)
placeholders.store("${#{key}[#{idx-size}]}", v) # support [-1]
end
elsif value.kind_of?(Hash) # record, etc
value.each do |k, v|
placeholders.store(%Q[${#{key}["#{k}"]}], v) # record["foo"]
end
else # string, interger, float, and others?
placeholders.store("${#{key}}", value)
end
end
placeholders
end
# Expand string with placeholders
#
# @param [String] str
# @param [Boolean] force_stringify the value must be string, used for hash key
def expand(str, placeholders, force_stringify = false)
if @auto_typecast && !force_stringify
single_placeholder_matched = str.match(/\A(\${[^}]+}|__[A-Z_]+__)\z/)
if single_placeholder_matched
log_if_unknown_placeholder($1, placeholders)
return placeholders[single_placeholder_matched[1]]
end
end
str.gsub(/(\${[^}]+}|__[A-Z_]+__)/) {
log_if_unknown_placeholder($1, placeholders)
placeholders[$1]
}
end
private
def log_if_unknown_placeholder(placeholder, placeholders)
unless placeholders.include?(placeholder)
log.warn "unknown placeholder `#{placeholder}` found"
end
end
end
# THIS CLASS MUST BE THREAD-SAFE
class RubyPlaceholderExpander
attr_reader :log
def initialize(params)
@log = params[:log]
@auto_typecast = params[:auto_typecast]
@cleanroom_expander = CleanroomExpander.new
end
def time_value(time)
Time.at(time)
end
# Preprocess record map to convert into ruby string expansion
#
# @param [Hash|String|Array] value record map config
# @param [Boolean] force_stringify the value must be string, used for hash key
def preprocess_map(value, force_stringify = false)
new_value = nil
if value.is_a?(String)
if @auto_typecast && !force_stringify
num_placeholders = value.scan('${').size
if num_placeholders == 1 && value.start_with?('${') && value.end_with?('}')
new_value = value[2..-2] # ${..} => ..
end
end
unless new_value
new_value = "%Q[#{value.gsub('${', '#{')}]" # xx${..}xx => %Q[xx#{..}xx]
end
elsif value.is_a?(Hash)
new_value = {}
value.each_pair do |k, v|
new_value[preprocess_map(k, true)] = preprocess_map(v)
end
elsif value.is_a?(Array)
new_value = []
value.each_with_index do |v, i|
new_value[i] = preprocess_map(v)
end
else
new_value = value
end
new_value
end
def prepare_placeholders(placeholder_values)
placeholder_values
end
# Expand string with placeholders
#
# @param [String] str
def expand(str, placeholders, force_stringify = false)
@cleanroom_expander.expand(
str,
placeholders['tag'],
placeholders['time'],
placeholders['record'],
placeholders['tag_parts'],
placeholders['tag_prefix'],
placeholders['tag_suffix'],
placeholders['hostname'],
)
rescue => e
raise "failed to expand `#{str}` : error = #{e}"
end
class CleanroomExpander
def expand(__str_to_eval__, tag, time, record, tag_parts, tag_prefix, tag_suffix, hostname)
instance_eval(__str_to_eval__)
end
(Object.instance_methods).each do |m|
undef_method m unless /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member|^class$/.match?(m.to_s)
end
end
end
end
end