fluent/fluentd

View on GitHub
lib/fluent/supervisor.rb

Summary

Maintainability
F
6 days
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fileutils'
require 'open3'
require 'pathname'

require 'fluent/config'
require 'fluent/counter'
require 'fluent/env'
require 'fluent/engine'
require 'fluent/error'
require 'fluent/log'
require 'fluent/plugin'
require 'fluent/rpc'
require 'fluent/system_config'
require 'fluent/msgpack_factory'
require 'fluent/variable_store'
require 'serverengine'

if Fluent.windows?
  require 'win32/ipc'
  require 'win32/event'
end

module Fluent
  module ServerModule
    def before_run
      @fluentd_conf = config[:fluentd_conf]
      @rpc_endpoint = nil
      @rpc_server = nil
      @counter = nil

      @fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-")
      ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir

      if config[:rpc_endpoint]
        @rpc_endpoint = config[:rpc_endpoint]
        @enable_get_dump = config[:enable_get_dump]
        run_rpc_server
      end

      if Fluent.windows?
        install_windows_event_handler
      else
        install_supervisor_signal_handlers
      end

      if counter = config[:counter_server]
        run_counter_server(counter)
      end

      if config[:disable_shared_socket]
        $log.info "shared socket for multiple workers is disabled"
      else
        server = ServerEngine::SocketManager::Server.open
        ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s
      end
    end

    def after_run
      stop_windows_event_thread if Fluent.windows?
      stop_rpc_server if @rpc_endpoint
      stop_counter_server if @counter
      cleanup_lock_dir
      Fluent::Supervisor.cleanup_resources
    end

    def cleanup_lock_dir
      FileUtils.rm(Dir.glob(File.join(@fluentd_lock_dir, "fluentd-*.lock")))
      FileUtils.rmdir(@fluentd_lock_dir)
    end

    def run_rpc_server
      @rpc_server = RPC::Server.new(@rpc_endpoint, $log)

      # built-in RPC for signals
      @rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res|
        $log.debug "fluentd RPC got /api/processes.interruptWorkers request"
        Process.kill :INT, Process.pid
        nil
      }
      @rpc_server.mount_proc('/api/processes.killWorkers') { |req, res|
        $log.debug "fluentd RPC got /api/processes.killWorkers request"
        Process.kill :TERM, Process.pid
        nil
      }
      @rpc_server.mount_proc('/api/processes.flushBuffersAndKillWorkers') { |req, res|
        $log.debug "fluentd RPC got /api/processes.flushBuffersAndKillWorkers request"
        if Fluent.windows?
          supervisor_sigusr1_handler
          stop(true)
        else
          Process.kill :USR1, Process.pid
          Process.kill :TERM, Process.pid
        end
        nil
      }
      @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res|
        $log.debug "fluentd RPC got /api/plugins.flushBuffers request"
        if Fluent.windows?
          supervisor_sigusr1_handler
        else
          Process.kill :USR1, Process.pid
        end
        nil
      }
      @rpc_server.mount_proc('/api/config.reload') { |req, res|
        $log.debug "fluentd RPC got /api/config.reload request"
        if Fluent.windows?
          # restart worker with auto restarting by killing
          kill_worker
        else
          Process.kill :HUP, Process.pid
        end
        nil
      }
      @rpc_server.mount_proc('/api/config.dump') { |req, res|
        $log.debug "fluentd RPC got /api/config.dump request"
        $log.info "dump in-memory config"
        supervisor_dump_config_handler
        nil
      }

      @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res|
        $log.debug "fluentd RPC got /api/config.gracefulReload request"
        if Fluent.windows?
          supervisor_sigusr2_handler
        else
          Process.kill :USR2, Process.pid
        end

        nil
      }

      @rpc_server.mount_proc('/api/config.getDump') { |req, res|
        $log.debug "fluentd RPC got /api/config.getDump request"
        $log.info "get dump in-memory config via HTTP"
        res.body = supervisor_get_dump_config_handler
        [nil, nil, res]
      } if @enable_get_dump

      @rpc_server.start
    end

    def stop_rpc_server
      @rpc_server.shutdown
    end

    def run_counter_server(counter_conf)
      @counter = Fluent::Counter::Server.new(
        counter_conf.scope,
        {host: counter_conf.bind, port: counter_conf.port, log: $log, path: counter_conf.backup_path}
      )
      @counter.start
    end

    def stop_counter_server
      @counter.stop
    end

    def install_supervisor_signal_handlers
      return if Fluent.windows?

      trap :HUP do
        $log.debug "fluentd supervisor process get SIGHUP"
        supervisor_sighup_handler
      end

      trap :USR1 do
        $log.debug "fluentd supervisor process get SIGUSR1"
        supervisor_sigusr1_handler
      end

      trap :USR2 do
        $log.debug 'fluentd supervisor process got SIGUSR2'
        supervisor_sigusr2_handler
      end
    end

    if Fluent.windows?
      # Override some methods of ServerEngine::MultiSpawnWorker
      # Since Fluentd's Supervisor doesn't use ServerEngine's HUP, USR1 and USR2
      # handlers (see install_supervisor_signal_handlers), they should be
      # disabled also on Windows, just send commands to workers instead.
      def restart(graceful)
        @monitors.each do |m|
          m.send_command(graceful ? "GRACEFUL_RESTART\n" : "IMMEDIATE_RESTART\n")
        end
      end

      def reload
        @monitors.each do |m|
          m.send_command("RELOAD\n")
        end
      end
    end

    def install_windows_event_handler
      return unless Fluent.windows?

      @pid_signame = "fluentd_#{Process.pid}"
      @signame = config[:signame]

      Thread.new do
        ipc = Win32::Ipc.new(nil)
        events = [
          {win32_event: Win32::Event.new("#{@pid_signame}_STOP_EVENT_THREAD"), action: :stop_event_thread},
          {win32_event: Win32::Event.new("#{@pid_signame}"), action: :stop},
          {win32_event: Win32::Event.new("#{@pid_signame}_HUP"), action: :hup},
          {win32_event: Win32::Event.new("#{@pid_signame}_USR1"), action: :usr1},
          {win32_event: Win32::Event.new("#{@pid_signame}_USR2"), action: :usr2},
          {win32_event: Win32::Event.new("#{@pid_signame}_CONT"), action: :cont},
        ]
        if @signame
          signame_events = [
            {win32_event: Win32::Event.new("#{@signame}"), action: :stop},
            {win32_event: Win32::Event.new("#{@signame}_HUP"), action: :hup},
            {win32_event: Win32::Event.new("#{@signame}_USR1"), action: :usr1},
            {win32_event: Win32::Event.new("#{@signame}_USR2"), action: :usr2},
            {win32_event: Win32::Event.new("#{@signame}_CONT"), action: :cont},
          ]
          events.concat(signame_events)
        end
        begin
          loop do
            infinite = 0xFFFFFFFF
            ipc_idx = ipc.wait_any(events.map {|e| e[:win32_event]}, infinite)
            event_idx = ipc_idx - 1

            if event_idx >= 0 && event_idx < events.length
              $log.debug("Got Win32 event \"#{events[event_idx][:win32_event].name}\"")
            else
              $log.warn("Unexpected return value of Win32::Ipc#wait_any: #{ipc_idx}")
            end
            case events[event_idx][:action]
            when :stop
              stop(true)
            when :hup
              supervisor_sighup_handler
            when :usr1
              supervisor_sigusr1_handler
            when :usr2
              supervisor_sigusr2_handler
            when :cont
              supervisor_dump_handler_for_windows
            when :stop_event_thread
              break
            end
          end
        ensure
          events.each { |event| event[:win32_event].close }
        end
      end
    end

    def stop_windows_event_thread
      if Fluent.windows?
        ev = Win32::Event.open("#{@pid_signame}_STOP_EVENT_THREAD")
        ev.set
        ev.close
      end
    end

    def supervisor_sighup_handler
      kill_worker
    end

    def supervisor_sigusr1_handler
      reopen_log
      send_signal_to_workers(:USR1)
    end

    def supervisor_sigusr2_handler
      conf = nil
      t = Thread.new do
        $log.info 'Reloading new config'

        # Validate that loading config is valid at first
        conf = Fluent::Config.build(
          config_path: config[:config_path],
          encoding: config[:conf_encoding],
          additional_config: config[:inline_config],
          use_v1_config: config[:use_v1_config],
        )

        Fluent::VariableStore.try_to_reset do
          Fluent::Engine.reload_config(conf, supervisor: true)
        end
      end

      t.report_on_exception = false # Error is handled by myself
      t.join

      reopen_log
      send_signal_to_workers(:USR2)
      @fluentd_conf = conf.to_s
    rescue => e
      $log.error "Failed to reload config file: #{e}"
    end

    def supervisor_dump_handler_for_windows
      # As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file,
      # and it is implemented before the implementation of the function for Windows.
      # It is possible to trap SIGCONT and handle it here also on UNIX-like,
      # but for backward compatibility, this handler is currently for a Windows-only.
      raise "[BUG] This function is for Windows ONLY." unless Fluent.windows?

      Thread.new do
        begin
          FluentSigdump.dump_windows
        rescue => e
          $log.error "failed to dump: #{e}"
        end
      end

      send_signal_to_workers(:CONT)
    rescue => e
      $log.error "failed to dump: #{e}"
    end

    def kill_worker
      if config[:worker_pid]
        pids = config[:worker_pid].clone
        config[:worker_pid].clear
        pids.each_value do |pid|
          if Fluent.windows?
            Process.kill :KILL, pid
          else
            Process.kill :TERM, pid
          end
        end
      end
    end

    def supervisor_dump_config_handler
      $log.info @fluentd_conf
    end

    def supervisor_get_dump_config_handler
      { conf: @fluentd_conf }
    end

    def dump
      super unless @stop
    end

    private

    def reopen_log
      if $log
        # Creating new thread due to mutex can't lock
        # in main thread during trap context
        Thread.new do
          $log.reopen!
        end
      end
    end

    def send_signal_to_workers(signal)
      return unless config[:worker_pid]

      if Fluent.windows?
        send_command_to_workers(signal)
      else
        config[:worker_pid].each_value do |pid|
          # don't rescue Errno::ESRCH here (invalid status)
          Process.kill(signal, pid)
        end
      end
    end

    def send_command_to_workers(signal)
      # Use SeverEngine's CommandSender on Windows
      case signal
      when :HUP
        restart(false)
      when :USR1
        restart(true)
      when :USR2
        reload
      when :CONT
        dump_all_windows_workers
      end
    end

    def dump_all_windows_workers
      @monitors.each do |m|
        m.send_command("DUMP\n")
      end
    end
  end

  module WorkerModule
    def spawn(process_manager)
      main_cmd = config[:main_cmd]
      env = {
        'SERVERENGINE_WORKER_ID' => @worker_id.to_i.to_s,
      }
      @pm = process_manager.spawn(env, *main_cmd)
    end

    def after_start
      (config[:worker_pid] ||= {})[@worker_id] = @pm.pid
    end

    def dump
      super unless @stop
    end
  end

  class Supervisor
    def self.serverengine_config(params = {})
      # ServerEngine's "daemonize" option is boolean, and path of pid file is brought by "pid_path"
      pid_path = params['daemonize']
      daemonize = !!params['daemonize']

      se_config = {
        worker_type: 'spawn',
        workers: params['workers'],
        log_stdin: false,
        log_stdout: false,
        log_stderr: false,
        enable_heartbeat: true,
        auto_heartbeat: false,
        unrecoverable_exit_codes: [2],
        stop_immediately_at_unrecoverable_exit: true,
        root_dir: params['root_dir'],
        logger: $log,
        log: $log.out,
        log_level: params['log_level'],
        chuser: params['chuser'],
        chgroup: params['chgroup'],
        chumask: params['chumask'],
        daemonize: daemonize,
        rpc_endpoint: params['rpc_endpoint'],
        counter_server: params['counter_server'],
        enable_get_dump: params['enable_get_dump'],
        windows_daemon_cmdline: [ServerEngine.ruby_bin_path,
                                 File.join(File.dirname(__FILE__), 'daemon.rb'),
                                 ServerModule.name,
                                 WorkerModule.name,
                                 JSON.dump(params)],
        command_sender: Fluent.windows? ? "pipe" : "signal",
        config_path: params['fluentd_conf_path'],
        fluentd_conf: params['fluentd_conf'],
        conf_encoding: params['conf_encoding'],
        inline_config: params['inline_config'],
        main_cmd: params['main_cmd'],
        signame: params['signame'],
        disable_shared_socket: params['disable_shared_socket'],
        restart_worker_interval: params['restart_worker_interval'],
      }
      se_config[:pid_path] = pid_path if daemonize

      se_config
    end

    def self.default_options
      {
        config_path: Fluent::DEFAULT_CONFIG_PATH,
        plugin_dirs: [Fluent::DEFAULT_PLUGIN_DIR],
        log_level: Fluent::Log::LEVEL_INFO,
        log_path: nil,
        daemonize: nil,
        libs: [],
        setup_path: nil,
        chuser: nil,
        chgroup: nil,
        chumask: "0",
        root_dir: nil,
        suppress_interval: 0,
        suppress_repeated_stacktrace: true,
        ignore_repeated_log_interval: nil,
        without_source: nil,
        enable_input_metrics: nil,
        enable_size_metrics: nil,
        use_v1_config: true,
        strict_config_value: nil,
        supervise: true,
        standalone_worker: false,
        signame: nil,
        conf_encoding: 'utf-8',
        disable_shared_socket: nil,
        config_file_type: :guess,
      }
    end

    def self.cleanup_resources
      unless Fluent.windows?
        if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH')
          FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
        end
      end
    end

    def initialize(cl_opt)
      @cl_opt = cl_opt
      opt = self.class.default_options.merge(cl_opt)

      @config_file_type = opt[:config_file_type]
      @daemonize = opt[:daemonize]
      @standalone_worker= opt[:standalone_worker]
      @config_path = opt[:config_path]
      @inline_config = opt[:inline_config]
      @use_v1_config = opt[:use_v1_config]
      @conf_encoding = opt[:conf_encoding]
      @log_path = opt[:log_path]
      @show_plugin_config = opt[:show_plugin_config]
      @libs = opt[:libs]
      @plugin_dirs = opt[:plugin_dirs]
      @chgroup = opt[:chgroup]
      @chuser = opt[:chuser]
      @chumask = opt[:chumask]
      @signame = opt[:signame]

      # TODO: `@log_rotate_age` and `@log_rotate_size` should be removed
      # since it should be merged with SystemConfig in `build_system_config()`.
      # We should always use `system_config.log.rotate_age` and `system_config.log.rotate_size`.
      # However, currently, there is a bug that `system_config.log` parameters
      # are not in `Fluent::SystemConfig::SYSTEM_CONFIG_PARAMETERS`, and these
      # parameters are not merged in `build_system_config()`.
      # Until we fix the bug of `Fluent::SystemConfig`, we need to use these instance variables.
      @log_rotate_age = opt[:log_rotate_age]
      @log_rotate_size = opt[:log_rotate_size]

      @finished = false
    end

    def run_supervisor(dry_run: false)
      if dry_run
        $log.info "starting fluentd-#{Fluent::VERSION} as dry run mode", ruby: RUBY_VERSION
      end

      if @system_config.workers < 1
        raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@system_config.workers}"
      end

      root_dir = @system_config.root_dir
      if root_dir
        if File.exist?(root_dir)
          unless Dir.exist?(root_dir)
            raise Fluent::InvalidRootDirectory, "non directory entry exists:#{root_dir}"
          end
        else
          begin
            FileUtils.mkdir_p(root_dir, mode: @system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION)
          rescue => e
            raise Fluent::InvalidRootDirectory, "failed to create root directory:#{root_dir}, #{e.inspect}"
          end
        end
      end

      begin
        ServerEngine::Privilege.change(@chuser, @chgroup)
        MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support)
        Fluent::Engine.init(@system_config, supervisor_mode: true)
        Fluent::Engine.run_configure(@conf, dry_run: dry_run)
      rescue Fluent::ConfigError => e
        $log.error 'config error', file: @config_path, error: e
        $log.debug_backtrace
        exit!(1)
      end

      if dry_run
        $log.info 'finished dry run mode'
        exit 0
      else
        supervise
      end
    end

    def options
      {
        'config_path' => @config_path,
        'pid_file' => @daemonize,
        'plugin_dirs' => @plugin_dirs,
        'log_path' => @log_path,
        'root_dir' => @system_config.root_dir,
      }
    end

    def run_worker
      Process.setproctitle("worker:#{@system_config.process_name}") if @process_name

      if @standalone_worker && @system_config.workers != 1
        raise Fluent::ConfigError, "invalid number of workers (must be 1 or unspecified) with --no-supervisor: #{@system_config.workers}"
      end

      install_main_process_signal_handlers

      # This is the only log messsage for @standalone_worker
      $log.info "starting fluentd-#{Fluent::VERSION} without supervision", pid: Process.pid, ruby: RUBY_VERSION if @standalone_worker

      main_process do
        create_socket_manager if @standalone_worker
        if @standalone_worker
          ServerEngine::Privilege.change(@chuser, @chgroup)
          File.umask(@chumask.to_i(8))
        end
        MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support)
        Fluent::Engine.init(@system_config)
        Fluent::Engine.run_configure(@conf)
        Fluent::Engine.run
        self.class.cleanup_resources if @standalone_worker
        exit 0
      end
    end

    def configure(supervisor: false)
      setup_global_logger(supervisor: supervisor)

      if @show_plugin_config
        show_plugin_config
      end

      if @inline_config == '-'
        $log.warn('the value "-" for `inline_config` is deprecated. See https://github.com/fluent/fluentd/issues/2711')
        @inline_config = STDIN.read
      end
      @conf = Fluent::Config.build(
        config_path: @config_path,
        encoding: @conf_encoding,
        additional_config: @inline_config,
        use_v1_config: @use_v1_config,
        type: @config_file_type,
      )
      @system_config = build_system_config(@conf)

      $log.info :supervisor, 'parsing config file is succeeded', path: @config_path

      @libs.each do |lib|
        require lib
      end

      @plugin_dirs.each do |dir|
        if Dir.exist?(dir)
          dir = File.expand_path(dir)
          Fluent::Plugin.add_plugin_dir(dir)
        end
      end

      if supervisor
        # plugins / configuration dumps
        Gem::Specification.find_all.select { |x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/ }.each do |spec|
          $log.info("gem '#{spec.name}' version '#{spec.version}'")
        end
      end
    end

    private

    def setup_global_logger(supervisor: false)
      if supervisor
        worker_id = 0
        process_type = :supervisor
      else
        worker_id = ENV['SERVERENGINE_WORKER_ID'].to_i
        process_type = case
                       when @standalone_worker then :standalone
                       when worker_id == 0 then :worker0
                       else :workers
                       end
      end

      # Parse configuration immediately to initialize logger in early stage.
      # Since we can't confirm the log messages in this parsing process,
      # we must parse the config again after initializing logger.
      conf = Fluent::Config.build(
        config_path: @config_path,
        encoding: @conf_encoding,
        additional_config: @inline_config,
        use_v1_config: @use_v1_config,
        type: @config_file_type,
      )
      system_config = build_system_config(conf)

      # TODO: we should remove this logic. This merging process should be done
      # in `build_system_config()`.
      @log_rotate_age ||= system_config.log.rotate_age
      @log_rotate_size ||= system_config.log.rotate_size

      rotate = @log_rotate_age || @log_rotate_size
      actual_log_path = @log_path

      # We need to prepare a unique path for each worker since Windows locks files.
      if Fluent.windows? && rotate && @log_path && @log_path != "-"
        actual_log_path = Fluent::Log.per_process_path(@log_path, process_type, worker_id)
      end

      if actual_log_path && actual_log_path != "-"
        FileUtils.mkdir_p(File.dirname(actual_log_path)) unless File.exist?(actual_log_path)
        if rotate
          logdev = Fluent::LogDeviceIO.new(
            actual_log_path,
            shift_age: @log_rotate_age,
            shift_size: @log_rotate_size,
          )
        else
          logdev = File.open(actual_log_path, "a")
        end

        if @chuser || @chgroup
          chuid = @chuser ? ServerEngine::Privilege.get_etc_passwd(@chuser).uid : nil
          chgid = @chgroup ? ServerEngine::Privilege.get_etc_group(@chgroup).gid : nil
          File.chown(chuid, chgid, actual_log_path)
        end

        if system_config.dir_permission
          File.chmod(system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION, File.dirname(actual_log_path))
        end
      else
        logdev = STDOUT
      end

      $log = Fluent::Log.new(
        # log_level: subtract 1 to match serverengine daemon logger side logging severity.
        ServerEngine::DaemonLogger.new(logdev, log_level: system_config.log_level - 1),
        path: actual_log_path,
        process_type: process_type,
        worker_id: worker_id,
        format: system_config.log.format,
        time_format: system_config.log.time_format,
        suppress_repeated_stacktrace: system_config.suppress_repeated_stacktrace,
        ignore_repeated_log_interval: system_config.ignore_repeated_log_interval,
        ignore_same_log_interval: system_config.ignore_same_log_interval,
      )
      $log.enable_color(false) if actual_log_path
      $log.enable_debug if system_config.log_level <= Fluent::Log::LEVEL_DEBUG

      $log.info "init #{process_type} logger",
                path: actual_log_path, 
                rotate_age: @log_rotate_age,
                rotate_size: @log_rotate_size
    end

    def create_socket_manager
      server = ServerEngine::SocketManager::Server.open
      ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s
    end

    def show_plugin_config
      name, type = @show_plugin_config.split(":") # input:tail
      $log.info "show_plugin_config option is deprecated. Use fluent-plugin-config-format --format=txt #{name} #{type}"
      exit 0
    end

    def supervise
      Process.setproctitle("supervisor:#{@system_config.process_name}") if @system_config.process_name
      $log.info "starting fluentd-#{Fluent::VERSION}", pid: Process.pid, ruby: RUBY_VERSION

      fluentd_spawn_cmd = build_spawn_command
      $log.info "spawn command to main: ", cmdline: fluentd_spawn_cmd

      params = {
        'main_cmd' => fluentd_spawn_cmd,
        'daemonize' => @daemonize,
        'inline_config' => @inline_config,
        'chuser' => @chuser,
        'chgroup' => @chgroup,
        'fluentd_conf_path' => @config_path,
        'fluentd_conf' => @conf.to_s,
        'use_v1_config' => @use_v1_config,
        'conf_encoding' => @conf_encoding,
        'signame' => @signame,

        'workers' => @system_config.workers,
        'root_dir' => @system_config.root_dir,
        'log_level' => @system_config.log_level,
        'rpc_endpoint' => @system_config.rpc_endpoint,
        'enable_get_dump' => @system_config.enable_get_dump,
        'counter_server' => @system_config.counter_server,
        'disable_shared_socket' => @system_config.disable_shared_socket,
        'restart_worker_interval' => @system_config.restart_worker_interval,
      }

      se = ServerEngine.create(ServerModule, WorkerModule) {
        # Note: This is called only at the initialization of ServerEngine, since
        # Fluentd overwrites all related SIGNAL(HUP,USR1,USR2) and have own reloading feature.
        Fluent::Supervisor.serverengine_config(params)
      }

      se.run
    end

    def install_main_process_signal_handlers
      # Fluentd worker process (worker of ServerEngine) don't use code in serverengine to set signal handlers,
      # because it does almost nothing.
      # This method is the only method to set signal handlers in Fluentd worker process.

      # When user use Ctrl + C not SIGINT, SIGINT is sent to all process in same process group.
      # ServerEngine server process will send SIGTERM to child(spawned) processes by that SIGINT, so
      # worker process SHOULD NOT do anything with SIGINT, SHOULD just ignore.
      trap :INT do
        $log.debug "fluentd main process get SIGINT"

        # When Fluentd is launched without supervisor, worker should handle ctrl-c by itself
        if @standalone_worker
          @finished = true
          $log.debug "getting start to shutdown main process"
          Fluent::Engine.stop
        end
      end

      trap :TERM do
        $log.debug "fluentd main process get SIGTERM"
        unless @finished
          @finished = true
          $log.debug "getting start to shutdown main process"
          Fluent::Engine.stop
        end
      end

      if Fluent.windows?
        install_main_process_command_handlers
      else
        trap :USR1 do
          flush_buffer
        end

        trap :USR2 do
          reload_config
        end

        trap :CONT do
          dump_non_windows
        end
      end
    end

    def install_main_process_command_handlers
      command_pipe = $stdin.dup
      $stdin.reopen(File::NULL, "rb")
      command_pipe.binmode
      command_pipe.sync = true

      Thread.new do
        loop do
          cmd = command_pipe.gets
          break unless cmd

          case cmd.chomp!
          when "GRACEFUL_STOP", "IMMEDIATE_STOP"
            $log.debug "fluentd main process get #{cmd} command"
            @finished = true
            $log.debug "getting start to shutdown main process"
            Fluent::Engine.stop
            break
          when "GRACEFUL_RESTART"
            $log.debug "fluentd main process get #{cmd} command"
            flush_buffer
          when "RELOAD"
            $log.debug "fluentd main process get #{cmd} command"
            reload_config
          when "DUMP"
            $log.debug "fluentd main process get #{cmd} command"
            dump_windows
          else
            $log.warn "fluentd main process get unknown command [#{cmd}]"
          end
        end
      end
    end

    def flush_buffer
      # Creating new thread due to mutex can't lock
      # in main thread during trap context
      Thread.new do
        begin
          $log.debug "fluentd main process get SIGUSR1"
          $log.info "force flushing buffered events"
          $log.reopen!
          Fluent::Engine.flush!
          $log.debug "flushing thread: flushed"
        rescue Exception => e
          $log.warn "flushing thread error: #{e}"
        end
      end
    end

    def reload_config
      Thread.new do
        $log.debug('worker got SIGUSR2')

        begin
          conf = Fluent::Config.build(
            config_path: @config_path,
            encoding: @conf_encoding,
            additional_config: @inline_config,
            use_v1_config: @use_v1_config,
            type: @config_file_type,
          )

          Fluent::VariableStore.try_to_reset do
            Fluent::Engine.reload_config(conf)
          end
        rescue => e
          # it is guaranteed that config file is valid by supervisor side. but it's not atomic because of using signals to commnicate between worker and super
          # So need this rescue code
          $log.error("failed to reload config: #{e}")
          next
        end

        @conf = conf
      end
    end

    def dump_non_windows
      begin
        Sigdump.dump unless @finished
      rescue => e
        $log.error("failed to dump: #{e}")
      end
    end

    def dump_windows
      Thread.new do
        begin
          FluentSigdump.dump_windows
        rescue => e
          $log.error("failed to dump: #{e}")
        end
      end
    end

    def logging_with_console_output
      yield $log
      unless $log.stdout?
        logger = ServerEngine::DaemonLogger.new(STDOUT)
        log = Fluent::Log.new(logger)
        log.level = @system_config.log_level
        console = log.enable_debug
        yield console
      end
    end

    def main_process(&block)
      if @system_config.process_name
        if @system_config.workers > 1
          Process.setproctitle("worker:#{@system_config.process_name}#{ENV['SERVERENGINE_WORKER_ID']}")
        else
          Process.setproctitle("worker:#{@system_config.process_name}")
        end
      end

      unrecoverable_error = false

      begin
        block.call
      rescue Fluent::ConfigError => e
        logging_with_console_output do |log|
          log.error "config error", file: @config_path, error: e
          log.debug_backtrace
        end
        unrecoverable_error = true
      rescue Fluent::UnrecoverableError => e
        logging_with_console_output do |log|
          log.error e.message, error: e
          log.error_backtrace
        end
        unrecoverable_error = true
      rescue ScriptError => e # LoadError, NotImplementedError, SyntaxError
        logging_with_console_output do |log|
          if e.respond_to?(:path)
            log.error e.message, path: e.path, error: e
          else
            log.error e.message, error: e
          end
          log.error_backtrace
        end
        unrecoverable_error = true
      rescue => e
        logging_with_console_output do |log|
          log.error "unexpected error", error: e
          log.error_backtrace
        end
      end

      exit!(unrecoverable_error ? 2 : 1)
    end

    def build_system_config(conf)
      system_config = SystemConfig.create(conf, @cl_opt[:strict_config_value])
      # Prefer the options explicitly specified in the command line
      # 
      # TODO: There is a bug that `system_config.log.rotate_age/rotate_size` are
      # not merged with the command line options since they are not in
      # `SYSTEM_CONFIG_PARAMETERS`.
      # We have to fix this bug.
      opt = {}
      Fluent::SystemConfig::SYSTEM_CONFIG_PARAMETERS.each do |param|
        if @cl_opt.key?(param) && !@cl_opt[param].nil?
          opt[param] = @cl_opt[param]
        end
      end
      system_config.overwrite_variables(**opt)
      system_config
    end

    RUBY_ENCODING_OPTIONS_REGEX = %r{\A(-E|--encoding=|--internal-encoding=|--external-encoding=)}.freeze

    def build_spawn_command
      if ENV['TEST_RUBY_PATH']
        fluentd_spawn_cmd = [ENV['TEST_RUBY_PATH']]
      else
        fluentd_spawn_cmd = [ServerEngine.ruby_bin_path]
      end

      rubyopt = ENV['RUBYOPT']
      if rubyopt
        encodes, others = rubyopt.split(' ').partition { |e| e.match?(RUBY_ENCODING_OPTIONS_REGEX) }
        fluentd_spawn_cmd.concat(others)

        adopted_encodes = encodes.empty? ? ['-Eascii-8bit:ascii-8bit'] : encodes
        fluentd_spawn_cmd.concat(adopted_encodes)
      else
        fluentd_spawn_cmd << '-Eascii-8bit:ascii-8bit'
      end

      if @system_config.enable_jit
        $log.info "enable Ruby JIT for workers (--jit)"
        fluentd_spawn_cmd << '--jit'
      end

      # Adding `-h` so that it can avoid ruby's command blocking
      # e.g. `ruby -Eascii-8bit:ascii-8bit` will block. but `ruby -Eascii-8bit:ascii-8bit -h` won't.
      _, e, s = Open3.capture3(*fluentd_spawn_cmd, "-h")
      if s.exitstatus != 0
        $log.error('Invalid option is passed to RUBYOPT', command: fluentd_spawn_cmd, error: e)
        exit s.exitstatus
      end

      fluentd_spawn_cmd << $0
      fluentd_spawn_cmd += $fluentdargv
      fluentd_spawn_cmd << '--under-supervisor'

      fluentd_spawn_cmd
    end
  end

  module FluentSigdump
    def self.dump_windows
      raise "[BUG] WindowsSigdump::dump is for Windows ONLY." unless Fluent.windows?

      # Sigdump outputs under `/tmp` dir without `SIGDUMP_PATH` specified,
      # but `/tmp` dir may not exist on Windows by default.
      # So use the systemroot-temp-dir instead.
      dump_filepath = ENV['SIGDUMP_PATH'].nil? || ENV['SIGDUMP_PATH'].empty? \
        ? "#{ENV['windir']}/Temp/fluentd-sigdump-#{Process.pid}.log"
        : get_path_with_pid(ENV['SIGDUMP_PATH'])

      require 'sigdump'
      Sigdump.dump(dump_filepath)

      $log.info "dump to #{dump_filepath}."
    end

    def self.get_path_with_pid(raw_path)
      path = Pathname.new(raw_path)
      path.sub_ext("-#{Process.pid}#{path.extname}").to_s
    end
  end
end