benchmark/plugin/in_forward.rb
#
# Fluent
#
# Copyright (C) 2011 FURUHASHI Sadayuki
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Fluent
class ForwardInput < Input
Plugin.register_input('forward', self)
def initialize
super
require 'fluent/plugin/socket_util'
end
config_param :port, :integer, :default => DEFAULT_LISTEN_PORT
config_param :bind, :string, :default => '0.0.0.0'
config_param :backlog, :integer, :default => nil
# SO_LINGER 0 to send RST rather than FIN to avoid lots of connections sitting in TIME_WAIT at src
config_param :linger_timeout, :integer, :default => 0
attr_reader :elapsed # for test
def configure(conf)
super
if element = conf.elements.first { |element| element.name == 'elapsed' }
tag = element["tag"] || 'elapsed'
interval = element["interval"].to_i || 60
hook = element['hook'] || 'on_message'
@elapsed = ElapsedMeasure.new(log, tag, interval, hook)
end
end
def start
@loop = Coolio::Loop.new
@lsock = listen
@loop.attach(@lsock)
@usock = SocketUtil.create_udp_socket(@bind)
@usock.bind(@bind, @port)
@usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request))
@loop.attach(@hbr)
@thread = Thread.new(&method(:run))
@elapsed.start if @elapsed
@cached_unpacker = $use_msgpack_5 ? nil : MessagePack::Unpacker.new
end
def shutdown
@loop.watchers.each {|w| w.detach }
@loop.stop
@usock.close
listen_address = (@bind == '0.0.0.0' ? '127.0.0.1' : @bind)
# This line is for connecting listen socket to stop the event loop.
# We should use more better approach, e.g. using pipe, fixing cool.io with timeout, etc.
TCPSocket.open(listen_address, @port) {|sock| } # FIXME @thread.join blocks without this line
@thread.join
@lsock.close
@elapsed.stop if @elapsed
end
class ElapsedMeasure
attr_reader :tag, :interval, :hook, :times, :sizes, :mutex, :thread, :log
def initialize(log, tag, interval, hook)
@log = log
@tag = tag
@interval = interval
@hook = hook.split(',')
@times = []
@sizes = []
@mutex = Mutex.new
end
def add(time, size)
@times << time
@sizes << size
end
def clear
@times.clear
@sizes.clear
end
def hookable?(caller)
@hook.include?(caller.to_s)
end
def measure_time(caller, size)
if hookable?(caller)
started = Time.now
yield
elapsed = (Time.now - started).to_f
log.debug "in_forward: elapsed time at #{caller} is #{elapsed} sec for #{size} bytes"
@mutex.synchronize { self.add(elapsed, size) }
else
yield
end
end
def start
@thread = Thread.new(&method(:run))
end
def stop
@thread.terminate
@thread.join
end
def run
@last_checked ||= Engine.now
while (sleep 0.5)
begin
now = Engine.now
if now - @last_checked >= @interval
flush(now)
@last_checked = now
end
rescue => e
log.warn "in_forward: #{e.class} #{e.message} #{e.backtrace.first}"
end
end
end
def flush(now)
times, sizes = [], []
@mutex.synchronize do
times = @times.dup
sizes = @sizes.dup
self.clear
end
if !times.empty? and !sizes.empty?
num = times.size
max = num == 0 ? 0 : times.max
avg = num == 0 ? 0 : times.map(&:to_f).inject(:+) / num.to_f
size_max = num == 0 ? 0 : sizes.max
size_avg = num == 0 ? 0 : sizes.map(&:to_f).inject(:+) / num.to_f
Engine.emit(@tag, now, {:num => num, :max => max, :avg => avg, :size_max => size_max, :size_avg => size_avg})
end
end
end
def listen
log.info "listening fluent socket on #{@bind}:#{@port}"
s = Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message), @elapsed)
s.listen(@backlog) unless @backlog.nil?
s
end
#config_param :path, :string, :default => DEFAULT_SOCKET_PATH
#def listen
# if File.exist?(@path)
# File.unlink(@path)
# end
# FileUtils.mkdir_p File.dirname(@path)
# log.debug "listening fluent socket on #{@path}"
# Coolio::UNIXServer.new(@path, Handler, method(:on_message))
#end
def run
@loop.run
rescue => e
log.error "unexpected error", :error => e, :error_class => e.class
log.error_backtrace
end
protected
# message Entry {
# 1: long time
# 2: object record
# }
#
# message Forward {
# 1: string tag
# 2: list<Entry> entries
# }
#
# message PackedForward {
# 1: string tag
# 2: raw entries # msgpack stream of Entry
# }
#
# message Message {
# 1: string tag
# 2: long? time
# 3: object record
# }
def on_message(msg)
if msg.nil?
# for future TCP heartbeat_request
return
end
# TODO format error
tag = msg[0].to_s
entries = msg[1]
if entries.class == String
# PackedForward
bytesize = tag.bytesize + entries.bytesize
measure_time(:on_message, bytesize) do
es = MessagePackEventStream.new(entries, @cached_unpacker)
Engine.emit_stream(tag, es)
end
elsif entries.class == Array
# Forward
es = MultiEventStream.new
entries.each {|e|
record = e[1]
next if record.nil?
time = e[0].to_i
time = (now ||= Engine.now) if time == 0
es.add(time, record)
}
measure_time(:on_message, 0) do
Engine.emit_stream(tag, es)
end
else
# Message
record = msg[2]
return if record.nil?
time = msg[1]
time = Engine.now if time == 0
bytesize = time.size + record.to_s.bytesize
measure_time(:on_message, bytesize) do
Engine.emit(tag, time, record)
end
end
end
def measure_time(caller, size)
@elapsed ? @elapsed.measure_time(caller, size) { yield } : yield
end
class Handler < Coolio::Socket
def initialize(io, linger_timeout, log, on_message, elapsed)
super(io)
if io.is_a?(TCPSocket)
opt = [1, linger_timeout].pack('I!I!') # { int l_onoff; int l_linger; }
io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
end
@on_message = on_message
@log = log
@log.trace {
remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername) rescue nil
"accepted fluent socket from '#{remote_addr}:#{remote_port}': object_id=#{self.object_id}"
}
@elapsed = elapsed
end
def on_connect
end
def on_read(data)
first = data[0]
if first == '{' || first == '['
m = method(:on_read_json)
@y = Yajl::Parser.new
@y.on_parse_complete = @on_message
else
m = method(:on_read_msgpack)
@u = MessagePack::Unpacker.new
end
(class << self; self; end).module_eval do
define_method(:on_read, m)
end
m.call(data)
end
def measure_time(caller, size)
@elapsed ? @elapsed.measure_time(caller, size) { yield } : yield
end
def on_read_json(data)
measure_time(:on_read, data.bytesize) do
@y << data
end
rescue => e
@log.error "forward error", :error => e, :error_class => e.class
@log.error_backtrace
close
end
def on_read_msgpack(data)
measure_time(:on_read, data.bytesize) do
@u.feed_each(data, &@on_message)
end
rescue => e
@log.error "forward error", :error => e, :error_class => e.class
@log.error_backtrace
close
end
def on_close
@log.trace { "closed fluent socket object_id=#{self.object_id}" }
end
end
class HeartbeatRequestHandler < Coolio::IO
def initialize(io, callback)
super(io)
@io = io
@callback = callback
end
def on_readable
begin
msg, addr = @io.recvfrom(1024)
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
return
end
host = addr[3]
port = addr[1]
@callback.call(host, port, msg)
rescue
# TODO log?
end
end
def on_heartbeat_request(host, port, msg)
#log.trace "heartbeat request from #{host}:#{port}"
begin
@usock.send "\0", 0, host, port
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
end
end
end
end