fluent/fluentd

View on GitHub
lib/fluent/plugin/out_forward/ack_handler.rb

Summary

Maintainability
B
6 hrs
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/plugin/output'
require 'fluent/plugin_helper/socket'
require 'fluent/engine'
require 'fluent/clock'

module Fluent::Plugin
  class ForwardOutput < Output
    class AckHandler
      module Result
        SUCCESS = :success
        FAILED = :failed
        CHUNKID_UNMATCHED = :chunkid_unmatched
      end

      def initialize(timeout:, log:, read_length:)
        @mutex = Mutex.new
        @ack_waitings = []
        @timeout = timeout
        @log = log
        @read_length = read_length
        @unpacker = Fluent::MessagePackFactory.msgpack_unpacker
      end

      def collect_response(select_interval)
        now = Fluent::Clock.now
        sockets = []
        results = []
        begin
          new_list = []
          @mutex.synchronize do
            @ack_waitings.each do |info|
              if info.expired?(now)
                # There are 2 types of cases when no response has been received from socket:
                # (1) the node does not support sending responses
                # (2) the node does support sending response but responses have not arrived for some reasons.
                @log.warn 'no response from node. regard it as unavailable.', host: info.node.host, port: info.node.port
                results << [info, Result::FAILED]
              else
                sockets << info.sock
                new_list << info
              end
            end
            @ack_waitings = new_list
          end

          begin
            readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval)
          rescue IOError
            @log.info "connection closed while waiting for readable sockets"
            readable_sockets = nil
          end

          if readable_sockets
            readable_sockets.each do |sock|
              results << read_ack_from_sock(sock)
            end
          end

          results.each do |info, ret|
            if info.nil?
              yield nil, nil, nil, ret
            else
              yield info.chunk_id, info.node, info.sock, ret
            end
          end
        rescue => e
          @log.error 'unexpected error while receiving ack', error: e
          @log.error_backtrace
        end
      end

      ACKWaitingSockInfo = Struct.new(:sock, :chunk_id, :chunk_id_base64, :node, :expired_time) do
        def expired?(now)
          expired_time < now
        end
      end

      Ack = Struct.new(:chunk_id, :node, :handler) do
        def enqueue(sock)
          handler.enqueue(node, sock, chunk_id)
        end
      end

      def create_ack(chunk_id, node)
        Ack.new(chunk_id, node, self)
      end

      def enqueue(node, sock, cid)
        info = ACKWaitingSockInfo.new(sock, cid, Base64.encode64(cid), node, Fluent::Clock.now + @timeout)
        @mutex.synchronize do
          @ack_waitings << info
        end
      end

      private

      def read_ack_from_sock(sock)
        begin
          raw_data = sock.instance_of?(Fluent::PluginHelper::Socket::WrappedSocket::TLS) ? sock.readpartial(@read_length) : sock.recv(@read_length)
        rescue Errno::ECONNRESET, EOFError # ECONNRESET for #recv, #EOFError for #readpartial
          raw_data = ''
        rescue IOError
          @log.info "socket closed while receiving ack response"
          return nil, Result::FAILED
        end

        info = find(sock)

        if info.nil?
          # The info can be deleted by another thread during `sock.recv()` and `find()`.
          # This is OK since another thread has completed to process the ack, so we can skip this.
          # Note: exclusion mechanism about `collect_response()` may need to be considered.
          @log.debug "could not find the ack info. this ack may be processed by another thread."
          return nil, Result::FAILED
        elsif raw_data.empty?
          # When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF.
          # If this happens we assume the data wasn't delivered and retry it.
          @log.warn 'destination node closed the connection. regard it as unavailable.', host: info.node.host, port: info.node.port
          # info.node.disable!
          return info, Result::FAILED
        else
          @unpacker.feed(raw_data)
          res = @unpacker.read
          @log.trace 'getting response from destination', host: info.node.host, port: info.node.port, chunk_id: dump_unique_id_hex(info.chunk_id), response: res
          if res['ack'] != info.chunk_id_base64
            # Some errors may have occurred when ack and chunk id is different, so send the chunk again.
            @log.warn 'ack in response and chunk id in sent data are different', chunk_id: dump_unique_id_hex(info.chunk_id), ack: res['ack']
            return info, Result::CHUNKID_UNMATCHED
          else
            @log.trace 'got a correct ack response', chunk_id: dump_unique_id_hex(info.chunk_id)
          end

          return info, Result::SUCCESS
        end
      rescue => e
        @log.error 'unexpected error while receiving ack message', error: e
        @log.error_backtrace
        [nil, Result::FAILED]
      ensure
        delete(info)
      end

      def dump_unique_id_hex(unique_id)
        Fluent::UniqueId.hex(unique_id)
      end

      def find(sock)
        @mutex.synchronize do
          @ack_waitings.find { |info| info.sock == sock }
        end
      end

      def delete(info)
        @mutex.synchronize do
          @ack_waitings.delete(info)
        end
      end
    end
  end
end