fluent/fluentd

View on GitHub
lib/fluent/compat/socket_util.rb

Summary

Maintainability
A
55 mins
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'ipaddr'

require 'cool.io'

require 'fluent/plugin'
require 'fluent/input'

module Fluent
  module Compat
    module SocketUtil
      def create_udp_socket(host)
        if IPAddr.new(IPSocket.getaddress(host)).ipv4?
          UDPSocket.new
        else
          UDPSocket.new(Socket::AF_INET6)
        end
      end
      module_function :create_udp_socket

      class UdpHandler < Coolio::IO
        def initialize(io, log, body_size_limit, callback)
          super(io)
          @io = io
          @log = log
          @body_size_limit = body_size_limit
          @callback = callback
        end

        def on_readable
          msg, addr = @io.recvfrom_nonblock(@body_size_limit)
          msg.chomp!
          @callback.call(msg, addr)
        rescue => e
          @log.error "unexpected error", error: e
        end
      end

      class TcpHandler < Coolio::Socket
        PEERADDR_FAILED = ["?", "?", "name resolution failed", "?"]

        def initialize(io, log, delimiter, callback)
          super(io)
          @timeout = 0
          if io.is_a?(TCPSocket)
            @addr = (io.peeraddr rescue PEERADDR_FAILED)

            opt = [1, @timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
            io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
          end
          @delimiter = delimiter
          @callback = callback
          @log = log
          @log.trace { "accepted fluent socket object_id=#{self.object_id}" }
          @buffer = "".force_encoding('ASCII-8BIT')
        end

        def on_connect
        end

        def on_read(data)
          @buffer << data
          pos = 0

          while i = @buffer.index(@delimiter, pos)
            msg = @buffer[pos...i]
            @callback.call(msg, @addr)
            pos = i + @delimiter.length
          end
          @buffer.slice!(0, pos) if pos > 0
        rescue => e
          @log.error "unexpected error", error: e
          close
        end

        def on_close
          @log.trace { "closed fluent socket object_id=#{self.object_id}" }
        end
      end

      class BaseInput < Fluent::Input
        def initialize
          super
          require 'fluent/parser'
        end

        desc 'Tag of output events.'
        config_param :tag, :string
        desc 'The format of the payload.'
        config_param :format, :string
        desc 'The port to listen to.'
        config_param :port, :integer, default: 5150
        desc 'The bind address to listen to.'
        config_param :bind, :string, default: '0.0.0.0'
        desc "The field name of the client's hostname."
        config_param :source_host_key, :string, default: nil
        config_param :blocking_timeout, :time, default: 0.5

        def configure(conf)
          super

          @parser = Plugin.new_parser(@format)
          @parser.configure(conf)
        end

        def start
          super

          @loop = Coolio::Loop.new
          @handler = listen(method(:on_message))
          @loop.attach(@handler)
          @thread = Thread.new(&method(:run))
        end

        def shutdown
          @loop.watchers.each { |w| w.detach }
          @loop.stop if @loop.instance_variable_get("@running")
          @handler.close
          @thread.join

          super
        end

        def run
          @loop.run(@blocking_timeout)
        rescue => e
          log.error "unexpected error", error: e
          log.error_backtrace
        end

        private

        def on_message(msg, addr)
          @parser.parse(msg) { |time, record|
            unless time && record
              log.warn "pattern not matched: #{msg.inspect}"
              return
            end

            record[@source_host_key] = addr[3] if @source_host_key
            router.emit(@tag, time, record)
          }
        rescue => e
          log.error msg.dump, error: e, host: addr[3]
          log.error_backtrace
        end
      end
    end
  end
end