fluent/fluentd

View on GitHub
lib/fluent/event_router.rb

Summary

Maintainability
C
7 hrs
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/match'
require 'fluent/event'
require 'fluent/filter'
require 'fluent/msgpack_factory'

module Fluent
  #
  # EventRouter is responsible to route events to a collector.
  #
  # It has a list of MatchPattern and Collector pairs:
  #
  #  +----------------+     +-----------------+
  #  |  MatchPattern  |     |    Collector    |
  #  +----------------+     +-----------------+
  #  |   access.**  ---------> type forward   |
  #  |     logs.**  ---------> type copy      |
  #  |  archive.**  ---------> type s3        |
  #  +----------------+     +-----------------+
  #
  # EventRouter does:
  #
  # 1) receive an event at `#emit` methods
  # 2) match the event's tag with the MatchPatterns
  # 3) forward the event to the corresponding Collector
  #
  # Collector is either of Output, Filter or other EventRouter.
  #
  class EventRouter
    def initialize(default_collector, emit_error_handler)
      @match_rules = []
      @match_cache = MatchCache.new
      @default_collector = default_collector
      @emit_error_handler = emit_error_handler
      @metric_callbacks = {}
      @caller_plugin_id = nil
    end

    attr_accessor :default_collector
    attr_accessor :emit_error_handler

    class Rule
      def initialize(pattern, collector)
        patterns = pattern.split(/\s+/).map { |str| MatchPattern.create(str) }
        @pattern = if patterns.length == 1
                     patterns[0]
                   else
                     OrMatchPattern.new(patterns)
                   end
        @pattern_str = pattern
        @collector = collector
      end

      def match?(tag)
        @pattern.match(tag)
      end

      attr_reader :collector
      attr_reader :pattern_str
    end

    def suppress_missing_match!
      if @default_collector.respond_to?(:suppress_missing_match!)
        @default_collector.suppress_missing_match!
      end
    end

    # called by Agent to add new match pattern and collector
    def add_rule(pattern, collector)
      @match_rules << Rule.new(pattern, collector)
    end

    def add_metric_callbacks(caller_plugin_id, callback)
      @metric_callbacks[caller_plugin_id] = callback
    end

    def caller_plugin_id=(caller_plugin_id)
      @caller_plugin_id = caller_plugin_id
    end

    def find_callback
      if @caller_plugin_id
        @metric_callbacks[@caller_plugin_id]
      else
        nil
      end
    end

    def emit(tag, time, record)
      unless record.nil?
        emit_stream(tag, OneEventStream.new(time, record))
      end
    end

    def emit_array(tag, array)
      emit_stream(tag, ArrayEventStream.new(array))
    end

    def emit_stream(tag, es)
      match(tag).emit_events(tag, es)
      if callback = find_callback
        callback.call(es)
      end
    rescue Pipeline::OutputError => e
      @emit_error_handler.handle_emits_error(tag, e.processed_es, e.internal_error)
    rescue => e
      @emit_error_handler.handle_emits_error(tag, es, e)
    end

    def emit_error_event(tag, time, record, error)
      @emit_error_handler.emit_error_event(tag, time, record, error)
    end

    def match?(tag)
      !!find(tag)
    end

    def match(tag)
      collector = @match_cache.get(tag) {
        find(tag) || @default_collector
      }
      collector
    end

    class MatchCache
      MATCH_CACHE_SIZE = 1024

      def initialize
        super
        @map = {}
        @keys = []
      end

      def get(key)
        if collector = @map[key]
          return collector
        end
        collector = @map[key] = yield
        if @keys.size >= MATCH_CACHE_SIZE
          # expire the oldest key
          @map.delete @keys.shift
        end
        @keys << key
        collector
      end
    end

    private

    class Pipeline

      class OutputError < StandardError
        attr_reader :internal_error
        attr_reader :processed_es

        def initialize(internal_error, processed_es)
          @internal_error = internal_error
          @processed_es = processed_es
        end
      end

      def initialize
        @filters = []
        @output = nil
        @optimizer = FilterOptimizer.new
      end

      def add_filter(filter)
        @filters << filter
        @optimizer.filters = @filters
      end

      def set_output(output)
        @output = output
      end

      def emit_events(tag, es)
        processed = @optimizer.filter_stream(tag, es)

        begin
          @output.emit_events(tag, processed)
        rescue => e
          raise OutputError.new(e, processed)
        end
      end

      class FilterOptimizer
        def initialize(filters = [])
          @filters = filters
          @optimizable = nil
        end

        def filters=(filters)
          @filters = filters
          reset_optimization
        end

        def filter_stream(tag, es)
          if optimizable?
            optimized_filter_stream(tag, es)
          else
            @filters.reduce(es) { |acc, filter|
              filtered_es = filter.filter_stream(tag, acc)
              filter.measure_metrics(filtered_es)
              filtered_es
            }
          end
        end

        private

        def optimized_filter_stream(tag, es)
          new_es = MultiEventStream.new
          es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
            filtered_record = record
            filtered_time = time

            catch :break_loop do
              @filters.each do |filter|
                if filter.has_filter_with_time
                  begin
                    filtered_time, filtered_record = filter.filter_with_time(tag, filtered_time, filtered_record)
                    throw :break_loop unless filtered_record && filtered_time
                    filter.measure_metrics(OneEventStream.new(time, record))
                  rescue => e
                    filter.router.emit_error_event(tag, filtered_time, filtered_record, e)
                  end
                else
                  begin
                    filtered_record = filter.filter(tag, filtered_time, filtered_record)
                    throw :break_loop unless filtered_record
                    filter.measure_metrics(OneEventStream.new(time, record))
                  rescue => e
                    filter.router.emit_error_event(tag, filtered_time, filtered_record, e)
                  end
                end
              end

              new_es.add(filtered_time, filtered_record)
            end
          end
          new_es
        end

        def optimizable?
          return @optimizable unless @optimizable.nil?
          fs_filters = filters_having_filter_stream
          @optimizable = if fs_filters.empty?
                           true
                         else
                           # skip log message when filter is only 1, because its performance is same as non optimized chain.
                           if @filters.size > 1 && fs_filters.size >= 1
                             $log.info "disable filter chain optimization because #{fs_filters.map(&:class)} uses `#filter_stream` method."
                           end
                           false
                         end
        end

        def filters_having_filter_stream
          @filters_having_filter_stream ||= @filters.select do |filter|
            filter.class.instance_methods(false).include?(:filter_stream)
          end
        end

        def reset_optimization
          @optimizable = nil
          @filters_having_filter_stream = nil
        end
      end
    end

    def find(tag)
      pipeline = nil
      @match_rules.each_with_index { |rule, i|
        if rule.match?(tag)
          if rule.collector.is_a?(Plugin::Filter)
            pipeline ||= Pipeline.new
            pipeline.add_filter(rule.collector)
          else
            if pipeline
              pipeline.set_output(rule.collector)
            else
              # Use Output directly when filter is not matched
              pipeline = rule.collector
            end
            return pipeline
          end
        end
      }

      if pipeline
        # filter is matched but no match
        pipeline.set_output(@default_collector)
        pipeline
      else
        nil
      end
    end
  end
end