lib/fluent/plugin/in_syslog.rb
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'fluent/plugin/input'
require 'fluent/config/error'
require 'fluent/plugin/parser'
require 'yajl'
module Fluent::Plugin
class SyslogInput < Input
Fluent::Plugin.register_input('syslog', self)
helpers :parser, :compat_parameters, :server
DEFAULT_PARSER = 'syslog'
SYSLOG_REGEXP = /^\<([0-9]+)\>(.*)/
FACILITY_MAP = {
0 => 'kern',
1 => 'user',
2 => 'mail',
3 => 'daemon',
4 => 'auth',
5 => 'syslog',
6 => 'lpr',
7 => 'news',
8 => 'uucp',
9 => 'cron',
10 => 'authpriv',
11 => 'ftp',
12 => 'ntp',
13 => 'audit',
14 => 'alert',
15 => 'at',
16 => 'local0',
17 => 'local1',
18 => 'local2',
19 => 'local3',
20 => 'local4',
21 => 'local5',
22 => 'local6',
23 => 'local7'
}
SEVERITY_MAP = {
0 => 'emerg',
1 => 'alert',
2 => 'crit',
3 => 'err',
4 => 'warn',
5 => 'notice',
6 => 'info',
7 => 'debug'
}
desc 'The port to listen to.'
config_param :port, :integer, default: 5140
desc 'The bind address to listen to.'
config_param :bind, :string, default: '0.0.0.0'
desc 'The prefix of the tag. The tag itself is generated by the tag prefix, facility level, and priority.'
config_param :tag, :string
desc 'The transport protocol used to receive logs.(udp, tcp)'
config_param :protocol_type, :enum, list: [:tcp, :udp], default: nil, deprecated: "use transport directive"
desc 'The message frame type.(traditional, octet_count)'
config_param :frame_type, :enum, list: [:traditional, :octet_count], default: :traditional
desc 'If true, add source host to event record.'
config_param :include_source_host, :bool, default: false, deprecated: 'use "source_hostname_key" or "source_address_key" instead.'
desc 'Specify key of source host when include_source_host is true.'
config_param :source_host_key, :string, default: 'source_host'.freeze
desc 'Enable the option to emit unmatched lines.'
config_param :emit_unmatched_lines, :bool, default: false
desc 'The field name of hostname of sender.'
config_param :source_hostname_key, :string, default: nil
desc 'Try to resolve hostname from IP addresses or not.'
config_param :resolve_hostname, :bool, default: nil
desc 'Check the remote connection is still available by sending a keepalive packet if this value is true.'
config_param :send_keepalive_packet, :bool, default: false
desc 'The field name of source address of sender.'
config_param :source_address_key, :string, default: nil
desc 'The field name of the severity.'
config_param :severity_key, :string, default: nil, alias: :priority_key
desc 'The field name of the facility.'
config_param :facility_key, :string, default: nil
desc "The max bytes of message"
config_param :message_length_limit, :size, default: 2048
config_param :blocking_timeout, :time, default: 0.5
desc 'The delimiter value "\n"'
config_param :delimiter, :string, default: "\n" # syslog family add "\n" to each message
config_section :parse do
config_set_default :@type, DEFAULT_PARSER
config_param :with_priority, :bool, default: true
end
# overwrite server plugin to change default to :udp
config_section :transport, required: false, multi: false, init: true, param_name: :transport_config do
config_argument :protocol, :enum, list: [:tcp, :udp, :tls], default: :udp
end
def configure(conf)
compat_parameters_convert(conf, :parser)
super
if conf.has_key?('priority_key')
log.warn "priority_key is deprecated. Use severity_key instead"
end
@use_default = false
@parser = parser_create
@parser_parse_priority = @parser.respond_to?(:with_priority) && @parser.with_priority
if @include_source_host
if @source_address_key
raise Fluent::ConfigError, "specify either source_address_key or include_source_host"
end
@source_address_key = @source_host_key
end
if @source_hostname_key
if @resolve_hostname.nil?
@resolve_hostname = true
elsif !@resolve_hostname # user specifies "false" in config
raise Fluent::ConfigError, "resolve_hostname must be true with source_hostname_key"
end
end
@_event_loop_run_timeout = @blocking_timeout
protocol = @protocol_type || @transport_config.protocol
if @send_keepalive_packet && protocol == :udp
raise Fluent::ConfigError, "send_keepalive_packet is available for tcp/tls"
end
end
def multi_workers_ready?
true
end
def start
super
log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type || @transport_config.protocol}"
case @protocol_type || @transport_config.protocol
when :udp then start_udp_server
when :tcp then start_tcp_server
when :tls then start_tcp_server(tls: true)
else
raise "BUG: invalid transport value: #{@protocol_type || @transport_config.protocol}"
end
end
def start_udp_server
server_create_udp(:in_syslog_udp_server, @port, bind: @bind, max_bytes: @message_length_limit, resolve_name: @resolve_hostname) do |data, sock|
message_handler(data.chomp, sock)
end
end
def start_tcp_server(tls: false)
octet_count_frame = @frame_type == :octet_count
delimiter = octet_count_frame ? " " : @delimiter
delimiter_size = delimiter.size
server_create_connection(
tls ? :in_syslog_tls_server : :in_syslog_tcp_server, @port,
bind: @bind,
resolve_name: @resolve_hostname,
send_keepalive_packet: @send_keepalive_packet
) do |conn|
conn.data do |data|
buffer = conn.buffer
buffer << data
pos = 0
if octet_count_frame
while idx = buffer.index(delimiter, pos)
num = Integer(buffer[pos..idx])
msg = buffer[idx + delimiter_size, num]
if msg.size != num
break
end
pos = idx + delimiter_size + num
message_handler(msg, conn)
end
else
while idx = buffer.index(delimiter, pos)
msg = buffer[pos...idx]
pos = idx + delimiter_size
message_handler(msg, conn)
end
end
buffer.slice!(0, pos) if pos > 0
end
end
end
private
def emit_unmatched(data, sock)
record = {"unmatched_line" => data}
record[@source_address_key] = sock.remote_addr if @source_address_key
record[@source_hostname_key] = sock.remote_host if @source_hostname_key
emit("#{@tag}.unmatched", Fluent::EventTime.now, record)
end
def message_handler(data, sock)
pri = nil
text = data
unless @parser_parse_priority
m = SYSLOG_REGEXP.match(data)
unless m
if @emit_unmatched_lines
emit_unmatched(data, sock)
end
log.warn "invalid syslog message: #{data.dump}"
return
end
pri = m[1].to_i
text = m[2]
end
@parser.parse(text) do |time, record|
unless time && record
if @emit_unmatched_lines
emit_unmatched(data, sock)
end
log.warn "failed to parse message", data: data
return
end
pri ||= record.delete('pri')
facility = FACILITY_MAP[pri >> 3]
severity = SEVERITY_MAP[pri & 0b111]
record[@severity_key] = severity if @severity_key
record[@facility_key] = facility if @facility_key
record[@source_address_key] = sock.remote_addr if @source_address_key
record[@source_hostname_key] = sock.remote_host if @source_hostname_key
tag = "#{@tag}.#{facility}.#{severity}"
emit(tag, time, record)
end
rescue => e
if @emit_unmatched_lines
emit_unmatched(data, sock)
end
log.error "invalid input", data: data, error: e
log.error_backtrace
end
def emit(tag, time, record)
router.emit(tag, time, record)
rescue => e
log.error "syslog failed to emit", error: e, tag: tag, record: Yajl.dump(record)
end
end
end