lib/fluent/plugin/in_udp_event.rb
# -*- encoding: utf-8 -*-
#
# Fluent Input Plugin UDP Event
#
# Copyright (C) 2013 Alexander Blagoev
module Fluent
# Fluentd UDP input main class
class UdpEventInput < Input
MAX_BLOCKTIME = 2
Plugin.register_input('udp_event', self)
def initialize
super
require 'fluent/plugin/socket_util'
end
config_param :port, :integer, default: 242_24
config_param :bind, :string, default: '0.0.0.0'
config_param :max_message_size, :integer, default: 1024
def configure(conf)
super
end
def start
callback = method(:receive_data)
@loop = Coolio::Loop.new
$log.debug "listening udp socket on #{@bind}:#{@port}"
@usock = SocketUtil.create_udp_socket(@bind)
@usock.bind(@bind, @port)
@handler = UdpHandler.new(@usock, @max_message_size, callback)
@loop.attach(@handler)
@thread = Thread.new(&method(:run))
end
def shutdown
# Force event every MAX_BLOCKTIME seconds to prevent 60 second timeout
# see ext/libev/ev.c MAX_BLOCKTIME
@watcher = Cool.io::TimerWatcher.new(MAX_BLOCKTIME, true)
@loop.attach(@watcher)
$log.debug 'stopping event loop'
begin
@loop.stop
rescue RuntimeError
end
$log.debug "closing udp socket on #{@bind}:#{@port}"
@handler.close
$log.debug 'closing watchers'
@loop.watchers.each { |w| w.detach }
$log.debug 'waiting for thread to finish'
@thread.join
$log.debug 'thread finished'
$log.debug 'terminating'
end
def run
@loop.run
rescue Exception => e
$log.error 'unexpected error', error: e.message
$log.error_backtrace
end
protected
def receive_data(data)
if data.bytesize == @max_message_size
$log.warn "message might be too big and truncated to #{@max_message_size}"
end
begin
parsed = JSON.parse(data)
rescue JSON::ParserError => e
$log.warn 'invalid json data', error: e.message
return
end
tag = parsed[0]
time = parsed[1].to_i
record = parsed[2]
if tag.nil? || time.nil? || record.nil?
$log.warn "invalid message supplied #{data}"
return
end
time ||= Engine.now
Engine.emit(tag, time, record)
rescue Exception => e
$log.warn data.dump, error: e.message
$log.debug_backtrace
end
private
# Class to handle the UDP layer
class UdpHandler < Coolio::IO
def initialize(io, max_message_size, callback)
super(io)
@io = io
@callback = callback
@max_message_size = max_message_size
end
def on_readable
msg, _ = @io.recvfrom_nonblock(@max_message_size)
@callback.call(msg)
rescue Exception => e
$log.error e.message, error: e.message
end
end
end
end