fluent/fluentd

View on GitHub
lib/fluent/plugin/filter.rb

Summary

Maintainability
A
2 hrs
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/plugin/base'

require 'fluent/event'
require 'fluent/log'
require 'fluent/plugin_id'
require 'fluent/plugin_helper'

module Fluent
  module Plugin
    class Filter < Base
      include PluginId
      include PluginLoggerMixin
      include PluginHelper::Mixin

      helpers_internal :event_emitter, :metrics

      attr_reader :has_filter_with_time

      def initialize
        super
        @has_filter_with_time = has_filter_with_time?
        @emit_records_metrics = nil
        @emit_size_metrics = nil
        @counter_mutex = Mutex.new
        @enable_size_metrics = false
      end

      def emit_records
        @emit_records_metrics.get
      end

      def emit_size
        @emit_size_metrics.get
      end

      def configure(conf)
        super

        @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "filter", name: "emit_records", help_text: "Number of count emit records")
        @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "filter", name: "emit_size", help_text: "Total size of emit events")
        @enable_size_metrics = !!system_config.enable_size_metrics
      end

      def statistics
        stats = {
          'emit_records' => @emit_records_metrics.get,
          'emit_size' => @emit_size_metrics.get,
        }

        { 'filter' => stats }
      end

      def measure_metrics(es)
        @emit_records_metrics.add(es.size)
        @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
      end

      def filter(tag, time, record)
        raise NotImplementedError, "BUG: filter plugins MUST implement this method"
      end

      def filter_with_time(tag, time, record)
        raise NotImplementedError, "BUG: filter plugins MUST implement this method"
      end

      def filter_stream(tag, es)
        new_es = MultiEventStream.new
        if @has_filter_with_time
          es.each do |time, record|
            begin
              filtered_time, filtered_record = filter_with_time(tag, time, record)
              new_es.add(filtered_time, filtered_record) if filtered_time && filtered_record
            rescue => e
              router.emit_error_event(tag, time, record, e)
            end
          end
        else
          es.each do |time, record|
            begin
              filtered_record = filter(tag, time, record)
              new_es.add(time, filtered_record) if filtered_record
            rescue => e
              router.emit_error_event(tag, time, record, e)
            end
          end
        end
        new_es
      end

      private

      def has_filter_with_time?
        implmented_methods = self.class.instance_methods(false)
        # Plugins that override `filter_stream` don't need check,
        # because they may not call `filter` or `filter_with_time`
        # for example fluentd/lib/fluent/plugin/filter_record_transformer.rb
        return nil if implmented_methods.include?(:filter_stream)
        case
        when [:filter, :filter_with_time].all? { |e| implmented_methods.include?(e) }
          raise "BUG: Filter plugins MUST implement either `filter` or `filter_with_time`"
        when implmented_methods.include?(:filter)
          false
        when implmented_methods.include?(:filter_with_time)
          true
        else
          raise NotImplementedError, "BUG: Filter plugins MUST implement either `filter` or `filter_with_time`"
        end
      end
    end
  end
end