lib/fluent/event_router.rb
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'fluent/match'
require 'fluent/event'
require 'fluent/filter'
require 'fluent/msgpack_factory'
module Fluent
#
# EventRouter is responsible to route events to a collector.
#
# It has a list of MatchPattern and Collector pairs:
#
# +----------------+ +-----------------+
# | MatchPattern | | Collector |
# +----------------+ +-----------------+
# | access.** ---------> type forward |
# | logs.** ---------> type copy |
# | archive.** ---------> type s3 |
# +----------------+ +-----------------+
#
# EventRouter does:
#
# 1) receive an event at `#emit` methods
# 2) match the event's tag with the MatchPatterns
# 3) forward the event to the corresponding Collector
#
# Collector is either of Output, Filter or other EventRouter.
#
class EventRouter
def initialize(default_collector, emit_error_handler)
@match_rules = []
@match_cache = MatchCache.new
@default_collector = default_collector
@emit_error_handler = emit_error_handler
@metric_callbacks = {}
@caller_plugin_id = nil
end
attr_accessor :default_collector
attr_accessor :emit_error_handler
class Rule
def initialize(pattern, collector)
patterns = pattern.split(/\s+/).map { |str| MatchPattern.create(str) }
@pattern = if patterns.length == 1
patterns[0]
else
OrMatchPattern.new(patterns)
end
@pattern_str = pattern
@collector = collector
end
def match?(tag)
@pattern.match(tag)
end
attr_reader :collector
attr_reader :pattern_str
end
def suppress_missing_match!
if @default_collector.respond_to?(:suppress_missing_match!)
@default_collector.suppress_missing_match!
end
end
# called by Agent to add new match pattern and collector
def add_rule(pattern, collector)
@match_rules << Rule.new(pattern, collector)
end
def add_metric_callbacks(caller_plugin_id, callback)
@metric_callbacks[caller_plugin_id] = callback
end
def caller_plugin_id=(caller_plugin_id)
@caller_plugin_id = caller_plugin_id
end
def find_callback
if @caller_plugin_id
@metric_callbacks[@caller_plugin_id]
else
nil
end
end
def emit(tag, time, record)
unless record.nil?
emit_stream(tag, OneEventStream.new(time, record))
end
end
def emit_array(tag, array)
emit_stream(tag, ArrayEventStream.new(array))
end
def emit_stream(tag, es)
match(tag).emit_events(tag, es)
if callback = find_callback
callback.call(es)
end
rescue Pipeline::OutputError => e
@emit_error_handler.handle_emits_error(tag, e.processed_es, e.internal_error)
rescue => e
@emit_error_handler.handle_emits_error(tag, es, e)
end
def emit_error_event(tag, time, record, error)
@emit_error_handler.emit_error_event(tag, time, record, error)
end
def match?(tag)
!!find(tag)
end
def match(tag)
collector = @match_cache.get(tag) {
find(tag) || @default_collector
}
collector
end
class MatchCache
MATCH_CACHE_SIZE = 1024
def initialize
super
@map = {}
@keys = []
end
def get(key)
if collector = @map[key]
return collector
end
collector = @map[key] = yield
if @keys.size >= MATCH_CACHE_SIZE
# expire the oldest key
@map.delete @keys.shift
end
@keys << key
collector
end
end
private
class Pipeline
class OutputError < StandardError
attr_reader :internal_error
attr_reader :processed_es
def initialize(internal_error, processed_es)
@internal_error = internal_error
@processed_es = processed_es
end
end
def initialize
@filters = []
@output = nil
@optimizer = FilterOptimizer.new
end
def add_filter(filter)
@filters << filter
@optimizer.filters = @filters
end
def set_output(output)
@output = output
end
def emit_events(tag, es)
processed = @optimizer.filter_stream(tag, es)
begin
@output.emit_events(tag, processed)
rescue => e
raise OutputError.new(e, processed)
end
end
class FilterOptimizer
def initialize(filters = [])
@filters = filters
@optimizable = nil
end
def filters=(filters)
@filters = filters
reset_optimization
end
def filter_stream(tag, es)
if optimizable?
optimized_filter_stream(tag, es)
else
@filters.reduce(es) { |acc, filter|
filtered_es = filter.filter_stream(tag, acc)
filter.measure_metrics(filtered_es)
filtered_es
}
end
end
private
def optimized_filter_stream(tag, es)
new_es = MultiEventStream.new
es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
filtered_record = record
filtered_time = time
catch :break_loop do
@filters.each do |filter|
if filter.has_filter_with_time
begin
filtered_time, filtered_record = filter.filter_with_time(tag, filtered_time, filtered_record)
throw :break_loop unless filtered_record && filtered_time
filter.measure_metrics(OneEventStream.new(time, record))
rescue => e
filter.router.emit_error_event(tag, filtered_time, filtered_record, e)
end
else
begin
filtered_record = filter.filter(tag, filtered_time, filtered_record)
throw :break_loop unless filtered_record
filter.measure_metrics(OneEventStream.new(time, record))
rescue => e
filter.router.emit_error_event(tag, filtered_time, filtered_record, e)
end
end
end
new_es.add(filtered_time, filtered_record)
end
end
new_es
end
def optimizable?
return @optimizable unless @optimizable.nil?
fs_filters = filters_having_filter_stream
@optimizable = if fs_filters.empty?
true
else
# skip log message when filter is only 1, because its performance is same as non optimized chain.
if @filters.size > 1 && fs_filters.size >= 1
$log.info "disable filter chain optimization because #{fs_filters.map(&:class)} uses `#filter_stream` method."
end
false
end
end
def filters_having_filter_stream
@filters_having_filter_stream ||= @filters.select do |filter|
filter.class.instance_methods(false).include?(:filter_stream)
end
end
def reset_optimization
@optimizable = nil
@filters_having_filter_stream = nil
end
end
end
def find(tag)
pipeline = nil
@match_rules.each_with_index { |rule, i|
if rule.match?(tag)
if rule.collector.is_a?(Plugin::Filter)
pipeline ||= Pipeline.new
pipeline.add_filter(rule.collector)
else
if pipeline
pipeline.set_output(rule.collector)
else
# Use Output directly when filter is not matched
pipeline = rule.collector
end
return pipeline
end
end
}
if pipeline
# filter is matched but no match
pipeline.set_output(@default_collector)
pipeline
else
nil
end
end
end
end