fluent/fluentd

View on GitHub
lib/fluent/plugin/out_forward/socket_cache.rb

Summary

Maintainability
A
25 mins
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/plugin/output'

module Fluent::Plugin
  class ForwardOutput < Output
    class SocketCache
      TimedSocket = Struct.new(:timeout, :key, :sock)

      def initialize(timeout, log)
        @log = log
        @timeout = timeout
        @available_sockets = Hash.new { |obj, k| obj[k] = [] }
        @inflight_sockets = {}
        @inactive_sockets = []
        @mutex = Mutex.new
      end

      def checkout_or(key)
        @mutex.synchronize do
          tsock = pick_socket(key)

          if tsock
            tsock.sock
          else
            sock = yield
            new_tsock = TimedSocket.new(timeout, key, sock)
            @log.debug("connect new socket #{new_tsock}")

            @inflight_sockets[sock] = new_tsock
            new_tsock.sock
          end
        end
      end

      def checkin(sock)
        @mutex.synchronize do
          if (s = @inflight_sockets.delete(sock))
            s.timeout = timeout
            @available_sockets[s.key] << s
          else
            @log.debug("there is no socket #{sock}")
          end
        end
      end

      def revoke(sock)
        @mutex.synchronize do
          if (s = @inflight_sockets.delete(sock))
            @inactive_sockets << s
          else
            @log.debug("there is no socket #{sock}")
          end
        end
      end

      def purge_obsolete_socks
        sockets = []

        @mutex.synchronize do
          # don't touch @inflight_sockets

          @available_sockets.each do |_, socks|
            socks.each do |sock|
              if expired_socket?(sock)
                sockets << sock
                socks.delete(sock)
              end
            end
          end

          # reuse same object (@available_sockets)
          @available_sockets.reject! { |_, v| v.empty? }

          sockets += @inactive_sockets
          @inactive_sockets.clear
        end

        sockets.each do |s|
          s.sock.close rescue nil
        end
      end

      def clear
        sockets = []
        @mutex.synchronize do
          sockets += @available_sockets.values.flat_map { |v| v }
          sockets += @inflight_sockets.values
          sockets += @inactive_sockets

          @available_sockets.clear
          @inflight_sockets.clear
          @inactive_sockets.clear
        end

        sockets.each do |s|
          s.sock.close rescue nil
        end
      end

      private

      # this method is not thread safe
      def pick_socket(key)
        if @available_sockets[key].empty?
          return nil
        end

        t = Time.now
        if (s = @available_sockets[key].find { |sock| !expired_socket?(sock, time: t) })
          @inflight_sockets[s.sock] = @available_sockets[key].delete(s)
          s.timeout = timeout
          s
        else
          nil
        end
      end

      def timeout
        @timeout && Time.now + @timeout
      end

      def expired_socket?(sock, time: Time.now)
        sock.timeout ? sock.timeout < time : false
      end
    end
  end
end