fluent/fluentd

View on GitHub
lib/fluent/plugin_helper/child_process.rb

Summary

Maintainability
F
3 days
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/plugin_helper/thread'
require 'fluent/plugin_helper/timer'
require 'fluent/clock'

require 'open3'
require 'timeout'

module Fluent
  module PluginHelper
    module ChildProcess
      include Fluent::PluginHelper::Thread
      include Fluent::PluginHelper::Timer

      CHILD_PROCESS_LOOP_CHECK_INTERVAL = 0.2 # sec
      CHILD_PROCESS_DEFAULT_EXIT_TIMEOUT = 10 # sec
      CHILD_PROCESS_DEFAULT_KILL_TIMEOUT = 60 # sec

      MODE_PARAMS = [:read, :write, :stderr, :read_with_stderr]
      STDERR_OPTIONS = [:discard, :connect]

      # stop     : mark callback thread as stopped
      # shutdown : close write IO to child processes (STDIN of child processes), send TERM (KILL for Windows) to all child processes
      # close    : send KILL to all child processes
      # terminate: [-]

      attr_reader :_child_process_processes # for tests

      def child_process_running?
        # checker for code in callback of child_process_execute
        ::Thread.current[:_fluentd_plugin_helper_child_process_running] || false
      end

      def child_process_id
        ::Thread.current[:_fluentd_plugin_helper_child_process_pid]
      end

      def child_process_exist?(pid)
        pinfo = @_child_process_processes[pid]
        return false unless pinfo

        return false if pinfo.exit_status

        true
      end

      # on_exit_callback = ->(status){ ... }
      # status is an instance of Process::Status
      # On Windows, exitstatus=0 and termsig=nil even when child process was killed.
      def child_process_execute(
          title, command,
          arguments: nil, subprocess_name: nil, interval: nil, immediate: false, parallel: false,
          mode: [:read, :write], stderr: :discard, env: {}, unsetenv: false, chdir: nil,
          internal_encoding: 'utf-8', external_encoding: 'ascii-8bit', scrub: true, replace_string: nil,
          wait_timeout: nil, on_exit_callback: nil,
          &block
      )
        raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol
        raise ArgumentError, "BUG: arguments required if subprocess name is replaced" if subprocess_name && !arguments

        mode ||= []
        mode = [] unless block
        raise ArgumentError, "BUG: invalid mode specification" unless mode.all?{|m| MODE_PARAMS.include?(m) }
        raise ArgumentError, "BUG: read_with_stderr is exclusive with :read and :stderr" if mode.include?(:read_with_stderr) && (mode.include?(:read) || mode.include?(:stderr))
        raise ArgumentError, "BUG: invalid stderr handling specification" unless STDERR_OPTIONS.include?(stderr)

        raise ArgumentError, "BUG: number of block arguments are different from size of mode" if block && block.arity != mode.size

        running = false
        callback = ->(*args) {
          running = true
          begin
            block && block.call(*args)
          ensure
            running = false
          end
        }

        retval = nil
        execute_child_process = ->(){
          child_process_execute_once(
            title, command, arguments,
            subprocess_name, mode, stderr, env, unsetenv, chdir,
            internal_encoding, external_encoding, scrub, replace_string,
            wait_timeout, on_exit_callback,
            &callback
          )
        }

        if immediate || !interval
          retval = execute_child_process.call
        end

        if interval
          timer_execute(:child_process_execute, interval, repeat: true) do
            if !parallel && running
              log.warn "previous child process is still running. skipped.", title: title, command: command, arguments: arguments, interval: interval, parallel: parallel
            else
              execute_child_process.call
            end
          end
        end

        retval # nil if interval
      end

      def initialize
        super
        # plugins MAY configure this parameter
        @_child_process_exit_timeout = CHILD_PROCESS_DEFAULT_EXIT_TIMEOUT
        @_child_process_kill_timeout = CHILD_PROCESS_DEFAULT_KILL_TIMEOUT
        @_child_process_mutex = Mutex.new
        @_child_process_processes = {} # pid => ProcessInfo
      end

      def stop
        @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid|
          process_info = @_child_process_processes[pid]
          if process_info
            process_info.thread[:_fluentd_plugin_helper_child_process_running] = false
          end
        end

        super
      end

      def shutdown
        @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid|
          process_info = @_child_process_processes[pid]
          next if !process_info
          process_info.writeio && process_info.writeio.close rescue nil
        end

        super

        @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid|
          process_info = @_child_process_processes[pid]
          next if !process_info
          child_process_kill(process_info)
        end

        exit_wait_timeout = Fluent::Clock.now + @_child_process_exit_timeout
        while Fluent::Clock.now < exit_wait_timeout
          process_exists = false
          @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid|
            unless @_child_process_processes[pid].exit_status
              process_exists = true
              break
            end
          end
          if process_exists
            sleep CHILD_PROCESS_LOOP_CHECK_INTERVAL
          else
            break
          end
        end
      end

      def close
        while true
          pids = @_child_process_mutex.synchronize{ @_child_process_processes.keys }
          break if pids.size < 1

          living_process_exist = false
          pids.each do |pid|
            process_info = @_child_process_processes[pid]
            next if !process_info || process_info.exit_status

            living_process_exist = true

            process_info.killed_at ||= Fluent::Clock.now # for illegular case (e.g., created after shutdown)
            timeout_at = process_info.killed_at + @_child_process_kill_timeout
            now = Fluent::Clock.now
            next if now < timeout_at

            child_process_kill(process_info, force: true)
          end

          break if living_process_exist

          sleep CHILD_PROCESS_LOOP_CHECK_INTERVAL
        end

        super
      end

      def terminate
        @_child_process_processes = {}

        super
      end

      def child_process_kill(pinfo, force: false)
        return if !pinfo
        pinfo.killed_at = Fluent::Clock.now unless force

        pid = pinfo.pid
        begin
          if !pinfo.exit_status && child_process_exist?(pid)
            signal = (Fluent.windows? || force) ? :KILL : :TERM
            Process.kill(signal, pinfo.pid)
          end
        rescue Errno::ECHILD, Errno::ESRCH
          # ignore
        end
      end

      ProcessInfo = Struct.new(
        :title,
        :thread, :pid,
        :readio, :readio_in_use, :writeio, :writeio_in_use, :stderrio, :stderrio_in_use,
        :wait_thread, :alive, :killed_at, :exit_status,
        :on_exit_callback, :on_exit_callback_mutex,
      )

      def child_process_execute_once(
          title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir,
          internal_encoding, external_encoding, scrub, replace_string, wait_timeout, on_exit_callback, &block
      )
        spawn_args = if arguments || subprocess_name
                       [ env, (subprocess_name ? [command, subprocess_name] : command), *(arguments || []) ]
                     else
                       [ env, command ]
                     end
        spawn_opts = {
          unsetenv_others: unsetenv,
        }
        if chdir
          spawn_opts[:chdir] = chdir
        end

        encoding_options = {}
        if scrub
          encoding_options[:invalid] = encoding_options[:undef] = :replace
          if replace_string
            encoding_options[:replace] = replace_string
          end
        end

        log.debug "Executing command", title: title, spawn: spawn_args, mode: mode, stderr: stderr

        readio = writeio = stderrio = wait_thread = nil
        readio_in_use = writeio_in_use = stderrio_in_use = false

        if !mode.include?(:stderr) && !mode.include?(:read_with_stderr)
          spawn_opts[:err] = IO::NULL if stderr == :discard
          if !mode.include?(:read) && !mode.include?(:read_with_stderr)
            spawn_opts[:out] = IO::NULL
          end
          writeio, readio, wait_thread = *Open3.popen2(*spawn_args, spawn_opts)
        elsif mode.include?(:read_with_stderr)
          writeio, readio, wait_thread = *Open3.popen2e(*spawn_args, spawn_opts)
        else
          writeio, readio, stderrio, wait_thread = *Open3.popen3(*spawn_args, spawn_opts)
        end

        if mode.include?(:write)
          writeio.set_encoding(external_encoding, internal_encoding, **encoding_options)
          writeio_in_use = true
        else
          writeio.reopen(IO::NULL) if writeio
        end
        if mode.include?(:read) || mode.include?(:read_with_stderr)
          readio.set_encoding(external_encoding, internal_encoding, **encoding_options)
          readio_in_use = true
        end
        if mode.include?(:stderr)
          stderrio.set_encoding(external_encoding, internal_encoding, **encoding_options)
          stderrio_in_use = true
        else
          stderrio.reopen(IO::NULL) if stderrio && stderr == :discard
        end

        pid = wait_thread.pid # wait_thread => Process::Waiter

        io_objects = []
        mode.each do |m|
          io_obj = case m
                   when :read then readio
                   when :write then writeio
                   when :read_with_stderr then readio
                   when :stderr then stderrio
                   else
                     raise "BUG: invalid mode must be checked before here: '#{m}'"
                   end
          io_objects << io_obj
        end

        m = Mutex.new
        m.lock
        thread = thread_create :child_process_callback do
          m.lock # run after plugin thread get pid, thread instance and i/o
          m.unlock
          begin
            @_child_process_processes[pid].alive = true
            block.call(*io_objects) if block_given?
            writeio.close if writeio
          rescue EOFError => e
            log.debug "Process exit and I/O closed", title: title, pid: pid, command: command, arguments: arguments
          rescue IOError => e
            if e.message == 'stream closed' || e.message == 'closed stream' # "closed stream" is of ruby 2.1
              log.debug "Process I/O stream closed", title: title, pid: pid, command: command, arguments: arguments
            else
              log.error "Unexpected I/O error for child process", title: title, pid: pid, command: command, arguments: arguments, error: e
            end
          rescue Errno::EPIPE => e
            log.debug "Broken pipe, child process unexpectedly exits", title: title, pid: pid, command: command, arguments: arguments
          rescue => e
            log.warn "Unexpected error while processing I/O for child process", title: title, pid: pid, command: command, error: e
          end

          if wait_timeout
            if wait_thread.join(wait_timeout) # Thread#join returns nil when limit expires
              # wait_thread successfully exits
              @_child_process_processes[pid].exit_status = wait_thread.value
            else
              log.warn "child process timed out", title: title, pid: pid, command: command, arguments: arguments
              child_process_kill(@_child_process_processes[pid], force: true)
              @_child_process_processes[pid].exit_status = wait_thread.value
            end
          else
            @_child_process_processes[pid].exit_status = wait_thread.value # with join
          end
          process_info = @_child_process_mutex.synchronize{ @_child_process_processes.delete(pid) }

          cb = process_info.on_exit_callback_mutex.synchronize do
            cback = process_info.on_exit_callback
            process_info.on_exit_callback = nil
            cback
          end
          if cb
            cb.call(process_info.exit_status) rescue nil
          end
          process_info.readio&.close rescue nil
          process_info.writeio&.close rescue nil
          process_info.stderrio&.close rescue nil
        end
        thread[:_fluentd_plugin_helper_child_process_running] = true
        thread[:_fluentd_plugin_helper_child_process_pid] = pid
        pinfo = ProcessInfo.new(
          title, thread, pid,
          readio, readio_in_use, writeio, writeio_in_use, stderrio, stderrio_in_use,
          wait_thread, false, nil, nil, on_exit_callback, Mutex.new
        )

        @_child_process_mutex.synchronize do
          @_child_process_processes[pid] = pinfo
        end
        m.unlock
        pid
      end
    end
  end
end