sonots/fluent-mixin-elapsed_time

View on GitHub
benchmark/plugin/in_forward.rb

Summary

Maintainability
B
6 hrs
Test Coverage
#
# Fluent
#
# Copyright (C) 2011 FURUHASHI Sadayuki
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#
module Fluent


  class ForwardInput < Input
    Plugin.register_input('forward', self)

    def initialize
      super
      require 'fluent/plugin/socket_util'
    end

    config_param :port, :integer, :default => DEFAULT_LISTEN_PORT
    config_param :bind, :string, :default => '0.0.0.0'
    config_param :backlog, :integer, :default => nil
    # SO_LINGER 0 to send RST rather than FIN to avoid lots of connections sitting in TIME_WAIT at src
    config_param :linger_timeout, :integer, :default => 0
    attr_reader :elapsed # for test

    def configure(conf)
      super

      if element = conf.elements.first { |element| element.name == 'elapsed' }
        tag = element["tag"] || 'elapsed'
        interval = element["interval"].to_i || 60
        hook = element['hook'] || 'on_message'
        @elapsed = ElapsedMeasure.new(log, tag, interval, hook)
      end
    end

    def start
      @loop = Coolio::Loop.new

      @lsock = listen
      @loop.attach(@lsock)

      @usock = SocketUtil.create_udp_socket(@bind)
      @usock.bind(@bind, @port)
      @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
      @hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request))
      @loop.attach(@hbr)

      @thread = Thread.new(&method(:run))
      @elapsed.start if @elapsed
      @cached_unpacker = $use_msgpack_5 ? nil : MessagePack::Unpacker.new
    end

    def shutdown
      @loop.watchers.each {|w| w.detach }
      @loop.stop
      @usock.close
      listen_address = (@bind == '0.0.0.0' ? '127.0.0.1' : @bind)
      # This line is for connecting listen socket to stop the event loop.
      # We should use more better approach, e.g. using pipe, fixing cool.io with timeout, etc.
      TCPSocket.open(listen_address, @port) {|sock| } # FIXME @thread.join blocks without this line
      @thread.join
      @lsock.close
      @elapsed.stop if @elapsed
    end

    class ElapsedMeasure
      attr_reader :tag, :interval, :hook, :times, :sizes, :mutex, :thread, :log
      def initialize(log, tag, interval, hook)
        @log = log
        @tag = tag
        @interval = interval
        @hook = hook.split(',')
        @times = []
        @sizes = []
        @mutex = Mutex.new
      end

      def add(time, size)
        @times << time
        @sizes << size
      end

      def clear
        @times.clear
        @sizes.clear
      end

      def hookable?(caller)
        @hook.include?(caller.to_s)
      end

      def measure_time(caller, size)
        if hookable?(caller)
          started = Time.now
          yield
          elapsed = (Time.now - started).to_f
          log.debug "in_forward: elapsed time at #{caller} is #{elapsed} sec for #{size} bytes"
          @mutex.synchronize { self.add(elapsed, size) }
        else
          yield
        end
      end

      def start
        @thread = Thread.new(&method(:run))
      end

      def stop
        @thread.terminate
        @thread.join
      end

      def run
        @last_checked ||= Engine.now
        while (sleep 0.5)
          begin
            now = Engine.now
            if now - @last_checked >= @interval
              flush(now)
              @last_checked = now
            end
          rescue => e
            log.warn "in_forward: #{e.class} #{e.message} #{e.backtrace.first}"
          end
        end
      end

      def flush(now)
        times, sizes = [], []
        @mutex.synchronize do
          times = @times.dup
          sizes = @sizes.dup
          self.clear
        end
        if !times.empty? and !sizes.empty?
          num = times.size
          max = num == 0 ? 0 : times.max
          avg = num == 0 ? 0 : times.map(&:to_f).inject(:+) / num.to_f
          size_max = num == 0 ? 0 : sizes.max
          size_avg = num == 0 ? 0 : sizes.map(&:to_f).inject(:+) / num.to_f
          Engine.emit(@tag, now, {:num => num, :max => max, :avg => avg, :size_max => size_max, :size_avg => size_avg})
        end
      end
    end

    def listen
      log.info "listening fluent socket on #{@bind}:#{@port}"
      s = Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message), @elapsed)
      s.listen(@backlog) unless @backlog.nil?
      s
    end

    #config_param :path, :string, :default => DEFAULT_SOCKET_PATH
    #def listen
    #  if File.exist?(@path)
    #    File.unlink(@path)
    #  end
    #  FileUtils.mkdir_p File.dirname(@path)
    #  log.debug "listening fluent socket on #{@path}"
    #  Coolio::UNIXServer.new(@path, Handler, method(:on_message))
    #end

    def run
      @loop.run
    rescue => e
      log.error "unexpected error", :error => e, :error_class => e.class
      log.error_backtrace
    end

    protected
    # message Entry {
    #   1: long time
    #   2: object record
    # }
    #
    # message Forward {
    #   1: string tag
    #   2: list<Entry> entries
    # }
    #
    # message PackedForward {
    #   1: string tag
    #   2: raw entries  # msgpack stream of Entry
    # }
    #
    # message Message {
    #   1: string tag
    #   2: long? time
    #   3: object record
    # }
    def on_message(msg)
      if msg.nil?
        # for future TCP heartbeat_request
        return
      end

      # TODO format error
      tag = msg[0].to_s
      entries = msg[1]

      if entries.class == String
        # PackedForward
        bytesize = tag.bytesize + entries.bytesize
        measure_time(:on_message, bytesize) do
          es = MessagePackEventStream.new(entries, @cached_unpacker)
          Engine.emit_stream(tag, es)
        end

      elsif entries.class == Array
        # Forward
        es = MultiEventStream.new
        entries.each {|e|
          record = e[1]
          next if record.nil?
          time = e[0].to_i
          time = (now ||= Engine.now) if time == 0
          es.add(time, record)
        }
        measure_time(:on_message, 0) do
          Engine.emit_stream(tag, es)
        end

      else
        # Message
        record = msg[2]
        return if record.nil?
        time = msg[1]
        time = Engine.now if time == 0
        bytesize = time.size + record.to_s.bytesize
        measure_time(:on_message, bytesize) do
          Engine.emit(tag, time, record)
        end
      end
    end

    def measure_time(caller, size)
      @elapsed ? @elapsed.measure_time(caller, size) { yield } : yield
    end

    class Handler < Coolio::Socket
      def initialize(io, linger_timeout, log, on_message, elapsed)
        super(io)
        if io.is_a?(TCPSocket)
          opt = [1, linger_timeout].pack('I!I!')  # { int l_onoff; int l_linger; }
          io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
        end
        @on_message = on_message
        @log = log
        @log.trace {
          remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername) rescue nil
          "accepted fluent socket from '#{remote_addr}:#{remote_port}': object_id=#{self.object_id}"
        }
        @elapsed = elapsed
      end

      def on_connect
      end

      def on_read(data)
        first = data[0]
        if first == '{' || first == '['
          m = method(:on_read_json)
          @y = Yajl::Parser.new
          @y.on_parse_complete = @on_message
        else
          m = method(:on_read_msgpack)
          @u = MessagePack::Unpacker.new
        end

        (class << self; self; end).module_eval do
          define_method(:on_read, m)
        end
        m.call(data)
      end

      def measure_time(caller, size)
        @elapsed ? @elapsed.measure_time(caller, size) { yield } : yield
      end

      def on_read_json(data)
        measure_time(:on_read, data.bytesize) do
          @y << data
        end
      rescue => e
        @log.error "forward error", :error => e, :error_class => e.class
        @log.error_backtrace
        close
      end

      def on_read_msgpack(data)
        measure_time(:on_read, data.bytesize) do
          @u.feed_each(data, &@on_message)
        end
      rescue => e
        @log.error "forward error", :error => e, :error_class => e.class
        @log.error_backtrace
        close
      end

      def on_close
        @log.trace { "closed fluent socket object_id=#{self.object_id}" }
      end
    end

    class HeartbeatRequestHandler < Coolio::IO
      def initialize(io, callback)
        super(io)
        @io = io
        @callback = callback
      end

      def on_readable
        begin
          msg, addr = @io.recvfrom(1024)
        rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
          return
        end
        host = addr[3]
        port = addr[1]
        @callback.call(host, port, msg)
      rescue
        # TODO log?
      end
    end

    def on_heartbeat_request(host, port, msg)
      #log.trace "heartbeat request from #{host}:#{port}"
      begin
        @usock.send "\0", 0, host, port
      rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
      end
    end
  end
end