fluent/fluentd

View on GitHub
lib/fluent/plugin/base.rb

Summary

Maintainability
B
4 hrs
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/plugin'
require 'fluent/configurable'
require 'fluent/system_config'

module Fluent
  module Plugin
    class Base
      include Configurable
      include SystemConfig::Mixin

      State = Struct.new(:configure, :start, :after_start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate)

      attr_accessor :under_plugin_development

      def initialize
        @log = nil
        super
        @fluentd_lock_dir = ENV['FLUENTD_LOCK_DIR']
        @_state = State.new(false, false, false, false, false, false, false, false, false)
        @_context_router = nil
        @_fluentd_worker_id = nil
        @under_plugin_development = false
      end

      def has_router?
        false
      end

      def plugin_root_dir
        nil # override this in plugin_id.rb
      end

      def fluentd_worker_id
        return @_fluentd_worker_id if @_fluentd_worker_id
        @_fluentd_worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i
        @_fluentd_worker_id
      end

      def configure(conf)
        raise ArgumentError, "BUG: type of conf must be Fluent::Config::Element, but #{conf.class} is passed." unless conf.is_a?(Fluent::Config::Element)

        if conf.for_this_worker? || (Fluent::Engine.supervisor_mode && !conf.for_every_workers?)
          system_config_override(workers: conf.target_worker_ids.size)
        end

        super(conf, system_config.strict_config_value)
        @_state ||= State.new(false, false, false, false, false, false, false, false, false)
        @_state.configure = true
        self
      end

      def multi_workers_ready?
        true
      end

      def get_lock_path(name)
        name = name.gsub(/[^a-zA-Z0-9]/, "_")
        File.join(@fluentd_lock_dir, "fluentd-#{name}.lock")
      end

      def acquire_worker_lock(name)
        if @fluentd_lock_dir.nil?
          raise InvalidLockDirectory, "can't acquire lock because FLUENTD_LOCK_DIR isn't set"
        end
        lock_path = get_lock_path(name)
        File.open(lock_path, "w") do |f|
          f.flock(File::LOCK_EX)
          yield
        end
        # Update access time to prevent tmpwatch from deleting a lock file.
        FileUtils.touch(lock_path);
      end

      def string_safe_encoding(str)
        unless str.valid_encoding?
          str = str.scrub('?')
          log.info "invalid byte sequence is replaced in `#{str}`" if self.respond_to?(:log)
        end
        yield str
      end

      def context_router=(router)
        @_context_router = router
      end

      def context_router
        @_context_router
      end

      def start
        # By initialization order, plugin logger is created before set log_event_enabled.
        # It causes '@id' specified plugin, it uses plugin logger instead of global logger, ignores `<label @FLUENT_LOG>` setting.
        # This is adhoc approach but impact is minimal.
        if @log.is_a?(Fluent::PluginLogger) && $log.respond_to?(:log_event_enabled) # log_event_enabled check for tests
          @log.log_event_enabled = $log.log_event_enabled
        end
        @_state.start = true
        self
      end

      def after_start
        @_state.after_start = true
        self
      end

      def stop
        @_state.stop = true
        self
      end

      def before_shutdown
        @_state.before_shutdown = true
        self
      end

      def shutdown
        @_state.shutdown = true
        self
      end

      def after_shutdown
        @_state.after_shutdown = true
        self
      end

      def close
        @_state.close = true
        self
      end

      def terminate
        @_state.terminate = true
        self
      end

      def configured?
        @_state.configure
      end

      def started?
        @_state.start
      end

      def after_started?
        @_state.after_start
      end

      def stopped?
        @_state.stop
      end

      def before_shutdown?
        @_state.before_shutdown
      end

      def shutdown?
        @_state.shutdown
      end

      def after_shutdown?
        @_state.after_shutdown
      end

      def closed?
        @_state.close
      end

      def terminated?
        @_state.terminate
      end

      def called_in_test?
        caller_locations.each do |location|
          # Thread::Backtrace::Location#path returns base filename or absolute path.
          # #absolute_path returns absolute_path always.
          # https://bugs.ruby-lang.org/issues/12159
          if /\/test_[^\/]+\.rb$/.match?(location.absolute_path) # location.path =~ /test_.+\.rb$/
            return true
          end
        end
        false
      end

      def inspect
        # Plugin instances are sometimes too big to dump because it may have too many thins (buffer,storage, ...)
        # Original commit comment says that:
        #   To emulate normal inspect behavior `ruby -e'o=Object.new;p o;p (o.__id__<<1).to_s(16)'`.
        #   https://github.com/ruby/ruby/blob/trunk/gc.c#L788
        "#<%s:%014x>" % [self.class.name, '0x%014x' % (__id__ << 1)]
      end

      def reloadable_plugin?
        # Engine can't capture all class variables. so it's forbbiden to use class variables in each plugins if enabling reload.
        self.class.class_variables.empty?
      end
    end
  end
end