fluent/fluentd

View on GitHub
lib/fluent/plugin_helper/service_discovery/manager.rb

Summary

Maintainability
A
1 hr
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/plugin/service_discovery'
require 'fluent/plugin_helper/service_discovery/round_robin_balancer'

module Fluent
  module PluginHelper
    module ServiceDiscovery
      class Manager
        def initialize(log:, load_balancer: nil, custom_build_method: nil)
          @log = log
          @load_balancer = load_balancer || RoundRobinBalancer.new
          @custom_build_method = custom_build_method

          @discoveries = []
          @services = {}
          @queue = Queue.new
          @static_config = true
        end

        def configure(configs, parent: nil)
          configs.each do |config|
            type, conf = if config.has_key?(:conf) # for compatibility with initial API
                           [config[:type], config[:conf]]
                         else
                           [config['@type'], config]
                         end

            sd = Fluent::Plugin.new_sd(type, parent: parent)
            sd.configure(conf)

            sd.services.each do |s|
              @services[s.discovery_id] = build_service(s)
            end
            @discoveries << sd

            if @static_config && type.to_sym != :static
              @static_config = false
            end
          end

          rebalance
        end

        def static_config?
          @static_config
        end

        def start
          @discoveries.each do |d|
            d.start(@queue)
          end
        end

        %i[after_start stop before_shutdown shutdown after_shutdown close terminate].each do |mth|
          define_method(mth) do
            @discoveries.each do |d|
              d.__send__(mth)
            end
          end
        end

        def run_once
          # Don't care race in this loop intentionally
          s = @queue.size

          if s == 0
            return
          end

          s.times do
            msg = @queue.pop

            unless msg.is_a?(Fluent::Plugin::ServiceDiscovery::DiscoveryMessage)
              @log.warn("BUG: #{msg}")
              next
            end

            begin
              handle_message(msg)
            rescue => e
              @log.error(e)
            end
          end

          rebalance
        end

        def rebalance
          @load_balancer.rebalance(services)
        end

        def select_service(&block)
          @load_balancer.select_service(&block)
        end

        def services
          @services.values
        end

        private

        def handle_message(msg)
          service = msg.service

          case msg.type
          when Fluent::Plugin::ServiceDiscovery::SERVICE_IN
            if (n = build_service(service))
              @log.info("Service in: name=#{service.name} #{service.host}:#{service.port}")
              @services[service.discovery_id] = n
            else
              raise "failed to build service in name=#{service.name} #{service.host}:#{service.port}"
            end
          when Fluent::Plugin::ServiceDiscovery::SERVICE_OUT
            s = @services.delete(service.discovery_id)
            if s
              @log.info("Service out: name=#{service.name} #{service.host}:#{service.port}")
            else
              @log.warn("Not found service: name=#{service.name} #{service.host}:#{service.port}")
            end
          else
            @log.error("BUG: unknow message type: #{msg.type}")
          end
        end

        def build_service(n)
          @custom_build_method ? @custom_build_method.call(n) : n
        end
      end
    end
  end
end