fluent/fluentd

View on GitHub
lib/fluent/plugin/out_exec_filter.rb

Summary

Maintainability
D
2 days
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#
require 'fluent/plugin/output'
require 'fluent/env'
require 'fluent/config/error'

require 'yajl'

module Fluent::Plugin
  class ExecFilterOutput < Output
    Fluent::Plugin.register_output('exec_filter', self)

    helpers :compat_parameters, :inject, :formatter, :parser, :extract, :child_process, :event_emitter

    desc 'The command (program) to execute.'
    config_param :command, :string

    config_param :remove_prefix, :string, default: nil, deprecated: "use @label instead for event routing"
    config_param :add_prefix, :string, default: nil, deprecated: "use @label instead for event routing"

    config_section :inject do
      config_set_default :time_type, :unixtime
    end

    config_section :format do
      config_set_default :@type, 'tsv'
      config_set_default :localtime, true
    end

    config_section :parse do
      config_set_default :@type, 'tsv'
      config_set_default :time_key, nil
      config_set_default :time_format, nil
      config_set_default :localtime, true
      config_set_default :estimate_current_event, false
    end

    config_section :extract do
      config_set_default :time_type, :float
    end

    config_section :buffer do
      config_set_default :flush_mode, :interval
      config_set_default :flush_interval, 1
    end

    config_param :tag, :string, default: nil

    config_param :tag_key, :string, default: nil, deprecated: "use 'tag_key' in <inject>/<extract> instead"
    config_param :time_key, :string, default: nil, deprecated: "use 'time_key' in <inject>/<extract> instead"
    config_param :time_format, :string, default: nil, deprecated: "use 'time_format' in <inject>/<extract> instead"

    desc 'The default block size to read if parser requires partial read.'
    config_param :read_block_size, :size, default: 10240 # 10k

    desc 'The number of spawned process for command.'
    config_param :num_children, :integer, default: 1

    desc 'Respawn command when command exit. ["none", "inf" or positive integer for times to respawn (default: none)]'
    # nil, 'none' or 0: no respawn, 'inf' or -1: infinite times, positive integer: try to respawn specified times only
    config_param :child_respawn, :string, default: nil

    # 0: output logs for all of messages to emit
    config_param :suppress_error_log_interval, :time, default: 0

    attr_reader :formatter, :parser # for tests

    KEYS_FOR_IN_AND_OUT = {
      'tag_key' => ['in_tag_key', 'out_tag_key'],
      'time_key' => ['in_time_key', 'out_time_key'],
      'time_format' => ['in_time_format', 'out_time_format'],
    }
    COMPAT_INJECT_PARAMS = {
      'in_tag_key' => 'tag_key',
      'in_time_key' => 'time_key',
      'in_time_format' => 'time_format',
    }
    COMPAT_FORMAT_PARAMS = {
      'in_format' => '@type',
      'in_keys' => 'keys',
    }
    COMPAT_PARSE_PARAMS = {
      'out_format' => '@type',
      'out_keys' => 'keys',
      'out_stream_buffer_size' => 'stream_buffer_size',
    }
    COMPAT_EXTRACT_PARAMS = {
      'out_tag_key' => 'tag_key',
      'out_time_key' => 'time_key',
      'out_time_format' => 'time_format',
    }

    def exec_filter_compat_parameters_copy_to_subsection!(conf, subsection_name, params)
      return unless conf.elements(subsection_name).empty?
      return unless params.keys.any?{|k| conf.has_key?(k) }
      hash = {}
      params.each_pair do |compat, current|
        hash[current] = conf[compat] if conf.has_key?(compat)
      end
      conf.elements << Fluent::Config::Element.new(subsection_name, '', hash, [])
    end

    def exec_filter_compat_parameters_convert!(conf)
      KEYS_FOR_IN_AND_OUT.each_pair do |inout, keys|
        if conf.has_key?(inout)
          keys.each do |k|
            conf[k] = conf[inout]
          end
        end
      end
      exec_filter_compat_parameters_copy_to_subsection!(conf, 'inject', COMPAT_INJECT_PARAMS)
      exec_filter_compat_parameters_copy_to_subsection!(conf, 'format', COMPAT_FORMAT_PARAMS)
      exec_filter_compat_parameters_copy_to_subsection!(conf, 'parse', COMPAT_PARSE_PARAMS)
      exec_filter_compat_parameters_copy_to_subsection!(conf, 'extract', COMPAT_EXTRACT_PARAMS)
    end

    def configure(conf)
      exec_filter_compat_parameters_convert!(conf)
      compat_parameters_convert(conf, :buffer)

      if inject_section = conf.elements('inject').first
        if inject_section.has_key?('time_format')
          inject_section['time_type'] ||= 'string'
        end
      end
      if extract_section = conf.elements('extract').first
        if extract_section.has_key?('time_format')
          extract_section['time_type'] ||= 'string'
        end
      end

      super

      if !@tag && (!@extract_config || !@extract_config.tag_key)
        raise Fluent::ConfigError, "'tag' or '<extract> tag_key </extract>' option is required on exec_filter output"
      end

      @formatter = formatter_create
      @parser = parser_create

      if @remove_prefix
        @removed_prefix_string = @remove_prefix + '.'
        @removed_length = @removed_prefix_string.length
      end
      if @add_prefix
        @added_prefix_string = @add_prefix + '.'
      end

      @respawns = if @child_respawn.nil? || (@child_respawn == 'none') || (@child_respawn == '0')
                    0
                  elsif (@child_respawn == 'inf') || (@child_respawn == '-1')
                    -1
                  elsif /^\d+$/.match?(@child_respawn)
                    @child_respawn.to_i
                  else
                    raise ConfigError, "child_respawn option argument invalid: none(or 0), inf(or -1) or positive number"
                  end

      @suppress_error_log_interval ||= 0
      @next_log_time = Time.now.to_i
    end

    def multi_workers_ready?
      true
    end

    ExecutedProcess = Struct.new(:mutex, :pid, :respawns, :readio, :writeio)

    def start
      super

      @children_mutex = Mutex.new
      @children = []
      @rr = 0

      exit_callback = ->(status){
        c = @children.find{|child| child.pid == status.pid }
        if c
          unless self.stopped?
            log.warn "child process exits with error code", code: status.to_i, status: status.exitstatus, signal: status.termsig
          end
          c.mutex.synchronize do
            (c.writeio && c.writeio.close) rescue nil
            (c.readio && c.readio.close) rescue nil
            c.pid = c.readio = c.writeio = nil
          end
        end
      }
      child_process_callback = ->(index, readio, writeio){
        pid = child_process_id
        c = @children[index]
        writeio.sync = true
        c.mutex.synchronize do
          c.pid = pid
          c.respawns = @respawns
          c.readio = readio
          c.writeio = writeio
        end

        run(readio)
      }
      execute_child_process = ->(index){
        child_process_execute("out_exec_filter_child#{index}".to_sym, @command, on_exit_callback: exit_callback) do |readio, writeio|
          child_process_callback.call(index, readio, writeio)
        end
      }

      @children_mutex.synchronize do
        @num_children.times do |i|
          @children << ExecutedProcess.new(Mutex.new, nil, 0, nil, nil)
          execute_child_process.call(i)
        end
      end

      if @respawns != 0
        thread_create(:out_exec_filter_respawn_monitor) do
          while thread_current_running?
            @children.each_with_index do |c, i|
              if c.mutex && c.mutex.synchronize{ c.pid.nil? && c.respawns != 0 }
                respawns = c.mutex.synchronize do
                  c.respawns -= 1 if c.respawns > 0
                  c.respawns
                end
                log.info "respawning child process", num: i, respawn_counter: respawns
                execute_child_process.call(i)
              end
            end
            sleep 0.2
          end
        end
      end
    end

    def terminate
      @children = []
      super
    end

    def tag_remove_prefix(tag)
      if @remove_prefix
        if ((tag[0, @removed_length] == @removed_prefix_string) && (tag.length > @removed_length)) || (tag == @removed_prefix_string)
          tag = tag[@removed_length..-1] || ''
        end
      end
      tag
    end

    NEWLINE = "\n"

    def format(tag, time, record)
      tag = tag_remove_prefix(tag)
      record = inject_values_to_record(tag, time, record)
      if @formatter.formatter_type == :text_per_line
        @formatter.format(tag, time, record).chomp + NEWLINE
      else
        @formatter.format(tag, time, record)
      end
    end

    def write(chunk)
      try_times = 0
      while true
        r = @rr = (@rr + 1) % @children.length
        if @children[r].pid && writeio = @children[r].writeio
          chunk.write_to(writeio)
          break
        end
        try_times += 1
        raise "no healthy child processes exist" if try_times >= @children.length
      end
    end

    def run(io)
      io.set_encoding(Encoding::ASCII_8BIT)
      case
      when @parser.implement?(:parse_io)
        @parser.parse_io(io, &method(:on_record))
      when @parser.implement?(:parse_partial_data)
        until io.eof?
          @parser.parse_partial_data(io.readpartial(@read_block_size), &method(:on_record))
        end
      when @parser.parser_type == :text_per_line
        io.each_line do |line|
          @parser.parse(line.chomp, &method(:on_record))
        end
      else
        @parser.parse(io.read, &method(:on_record))
      end
    end

    def on_record(time, record)
      tag = extract_tag_from_record(record)
      tag = @added_prefix_string + tag if tag && @add_prefix
      tag ||= @tag
      time ||= extract_time_from_record(record) || Fluent::EventTime.now
      router.emit(tag, time, record)
    rescue => e
      if @suppress_error_log_interval == 0 || Time.now.to_i > @next_log_time
        log.error "exec_filter failed to emit", record: Yajl.dump(record), error: e
        log.error_backtrace e.backtrace
        @next_log_time = Time.now.to_i + @suppress_error_log_interval
      end
      router.emit_error_event(tag, time, record, e) if tag && time && record
    end
  end
end